Option name | Type | Description |
---|---|---|
config | Object,Function,String | Stage configuration |
The Stage class, core of the pipeline.js
this one uses domain
to catch all errors.
- `error`
error whiule executing stage
- `done`
resulting context of staging
- `end`
examine that stage executing is complete
###Configuration
`config` as `Object`:
- ensure
method for ensuring the context
- rescue
method for rescue stage from errors
- validate
validate contect method
- schema
schema validation for context
- run
the method that will be evaluated as worker
###config as Function
`config` is the `run` method of the stage
###config as String
`config` is the `name` of the stage
function Stage(config) {
var self = this;
if (!(self instanceof Stage)) {
throw new Error('constructor is not a function');
}
if (config) {
if (typeof(config) === 'object') {
if (typeof(config.ensure) === 'function') {
self.ensure = config.ensure;
}
if (typeof(config.rescue) === 'function') {
self.rescue = config.rescue;
}
if (config.validate && config.schema) {
throw new Error('use either validate or schema');
}
if (typeof(config.validate) === 'function') {
self.validate = config.validate;
}
if (typeof(config.schema) === 'object') {
self.validate = schema(config.schema);
}
if (typeof(config.run) === 'function') {
self.run = config.run;
}
} else {
if (typeof(config) === 'function') {
self.run = config;
}
}
if (typeof(config) === 'string') {
self.name = config;
} else {
if (config.name) {
self.name = config.name;
} else {
var match = self.run.toString().match(/function\s*(\w+)\s*\(/);
if (match && match[1]) {
self.name = match[1];
} else {
self.name = self.run.toString();
}
}
}
}
EventEmitter.call(self);
}
provaide a way to get stage name for reports used for tracing
Stage.prototype.reportName = function() {
var self = this;
return 'STG:' + (self.name ? (' ' + self.name) : '');
};
Option name | Type | Description |
---|---|---|
context | ontex | |
Function | allbac |
Ensures context validity
this can be overridden by user
in sync or async way
in sync way it has signaturefunction(context):Error
so it must return error if context is invalid
sync signature
Stage.prototype.ensure = function(context, callback) {
var self = this;
var validation = self.validate(context);
if (validation) {
if ('boolean' === typeof validation) {
callback(null);
} else {
callback(validation);
}
} else {
callback(new Error(self.reportName() + ' reports: Context is invalid'));
}
};
internal storage for name
Stage.prototype.name = undefined;
default validate
implementation
Stage.prototype.validate = function(context) {
return true;
};
Option name | Type | Description |
---|---|---|
Error|null | r | |
Context | ontex | |
Function | callback | callback for async rescue process return Error|undefined|null |
Allpurpose Error handler for stage
Stage.prototype.rescue = function(err, context, callback) {
if (typeof callback === 'function') {
callback(err);
} else {
return err;
}
};
run function, can be assigned by child class
Singature
function(err, ctx, done) -- async wit custom error handler! deprecated. err always null.
function(ctx, done) -- async
function(ctx) -- sync call
function() -- sync call context
applyed as this for function.
Stage.prototype.run = 0;
return d.bind(function() {
var args = Array.prototype.slice.call(arguments);
args.unshift(fn);
setImmediate.apply(null, args);
});
}
return d.bind(function() {
var args = Array.prototype.slice.call(arguments);
args.unshift(fn);
setImmediate.apply(null, args);
});
}
Option name | Type | Description |
---|---|---|
Context|Object | contex | incoming context |
Function | allbac | incoming callback function function(err, ctx) |
executes stage and return result to callback
always async
Stage.prototype.execute = function(_context, callback) {
var self = this;
var d = require('domain').create();
var ensureAsync = failproofAsyncCall.bind(undefined, d);
var ensureSync = failproofSyncCall.bind(undefined, d);
d.add(self);
d.on('error', finalizeIt);
var context = Context.ensure(_context);
var hasCallback = typeof(callback) === 'function';
function afterErrorCheck(err) {
if (err) {
context.addError(err);
if (self.listeners('error').length > 0) {
self.emit('error', err, context);
}
} else {
self.emit('done', context);
}
self.emit('end', context);
if (hasCallback) {
d.exit(); // exit from domain;
// setImmediate(function(err, context) {
callback(err, context);
// }, err, context);
}
}
function finalizeIt(_err) {
var err;
if (_err) {
var len = self.rescue.length;
switch (len) {
case 1:
afterErrorCheck(self.rescue(_err));
break;
case 2:
err = self.rescue(_err, context);
afterErrorCheck(err);
break;
case 3:
self.rescue(_err, context, afterErrorCheck);
break;
default:
afterErrorCheck(err);
}
} else {
afterErrorCheck();
}
}
function afterEnsure(err) {
if (!err) { // passing
if (typeof(self.run) == 'function') {
var hasError = null;
switch (self.run.length) {
case 0:
ensureSync(context, self.run, finalizeIt)();
break;
case 1:
ensureSync(self, self.run, finalizeIt)(context);
break;
case 2:
ensureAsync(self, self.run)(context, finalizeIt);
break;
case 3:
ensureAsync(self, self.run)(null, context, finalizeIt);
break;
default:
ensureAsync(self, finalizeIt(new Error('unacceptable signature')));
}
} else {
ensureAsync(self, finalizeIt(new Error(self.reportName() + ' reports: run is not a function')));
}
} else {
ensureAsync(self, finalizeIt(err));
}
}
switch (self.ensure.length) {
case 2:
self.ensure(context, afterEnsure);
break;
case 1:
var verr = self.ensure(context);
afterEnsure(verr);
break;
default:
finalizeIt(new Error('unknown ensure signature'));
}
};