pipeline.js

Parallel

function
Parallel()

Option name Type Description
config Object

configuration object

Process staging in parallel way

config as Object

  • stage
      evaluating stage
    
  • split
      function that split existing stage into smalls parts, it needed
    
  • combine
      if any result combining is need, this can be used to combine splited parts and update context
    

Note
split does not require combine it will return parent context;
in cases that have no declaration for split configured or default will be used

function Parallel(config) {

	var self = this;

	if (!(self instanceof Parallel)) {
		throw new Error('constructor is not a function');
	}

	if (config && config.run instanceof Function) {
		config.stage = new Stage(config.run);
		delete config.run;
	}

	Stage.apply(self, arguments);

	if (!config) {
		config = {};
	}

	if (config instanceof Stage) {
		config = {
			stage: config
		};
	}

	if (config.stage instanceof Stage) {
		self.stage = config.stage;
	} else {
		if (config.stage instanceof Function) {
			self.stage = new Stage(config.stage);
		} else {
			self.stage = new Empty();
		}
	}

	if (config.split instanceof Function) {
		self.split = config.split;
	}

	if (config.combine instanceof Function) {
		self.combine = config.combine;
	}

	self.name = config.name;
}

stage

property
Parallel.prototype.stage

internal declaration fo success

Parallel.prototype.stage = undefined;

split

method
Parallel.prototype.split()

internal declaration fo success

Parallel.prototype.split = function(ctx) {
	return [ctx];
};

combine

method
Parallel.prototype.combine()

Option name Type Description
ctx Context

main context

children Array.<Context>

list of all children contexts

internal declaration fo combine

Parallel.prototype.combine = function(ctx, children) {
};

reportName

method
Parallel.prototype.reportName()

override of reportName

Parallel.prototype.reportName = function() {
	var self = this;
	return "PLL:" + self.name;
};

compile

method
Parallel.prototype.compile()

override of compile
split all and run all

Parallel.prototype.compile = function() {
	var self = this;
	if (!self.name) {
		self.name = self.stage.reportName();
	}
	var run = function(err, ctx, done) {
		var iter = 0;
		var children = self.split(ctx);
		var len = children ? children.length : 0;
		var errors = [];

		function finish() {
			if (errors.length > 0) {
				done(new ErrorList(errors));
			} else {
				self.combine(ctx, children);
				done();
			}
		}

		function logError(err, index) {
			errors.push({
				stage: self.name,
				index: index,
				err: err,
				stack: err.stack,
				ctx: children[index]
			});
		}

		var next = function(index) {
			return function(err, retCtx) {
				iter++;
				if (err) {
					logError(err, index);
				} else {
					children[index] = retCtx;
				}
				if (iter >= len) {
					finish();
				}
			};
		};

		if (len === 0) {
			finish();
		} else {
			for (var i = 0; i < len; i++) {
				self.stage.execute(ctx.ensureIsChild(children[i]), next(i));
			}
		}
	};
	self.run = run;
};

execute

method
Parallel.prototype.execute()

Option name Type Description
context Context

evaluating context

[callback] Context

returning callback

override of execute

Parallel.prototype.execute = function(context, callback) {
	var self = this;
	if (!self.run) {
		self.compile();
	}
	Parallel.super_.prototype.execute.apply(self, arguments);
};