Option name | Type | Description |
---|---|---|
config | Object,Function,String | Stage configuration |
The Stage class, core of the pipeline.js
events:
error
error whiule executing stage
done
resulting context of staging
end
examine that stage executing is complete
General Stage definition
config
as Object
:
config
as Function
config
is the run
method of the stage
config
as String
is the name
of the stagefunction Stage(config) {
var self = this;
if (!(self instanceof Stage)) {
throw new Error('constructor is not a function');
}
if (config) {
if (typeof(config) === 'object') {
if (typeof(config.ensure) === 'function') {
self.ensure = config.ensure;
}
if (typeof(config.rescue) === 'function') {
self.rescue = config.rescue;
}
if (config.validate && config.schema) {
throw new Error('use either validate or schema');
}
if (typeof(config.validate) === 'function') {
self.validate = config.validate;
}
if (typeof(config.schema) === 'object') {
self.validate = schema(config.schema);
}
if (typeof(config.run) === 'function') {
self.run = config.run;
}
} else {
if (typeof(config) === 'function') {
self.run = config;
}
}
if (typeof(config) === 'string') {
self.name = config;
} else {
if (config.name) {
self.name = config.name;
} else {
var match = self.run.toString().match(/function\s*(\w+)\s*\(/);
if (match && match[1]) {
self.name = match[1];
} else {
self.name = self.run.toString();
}
}
}
}
EventEmitter.call(self);
}
provaide a way to get stage name for reports used for tracing
Stage.prototype.reportName = function() {
var self = this;
return 'STG:' + (self.name ? (' ' + self.name) : '');
};
Option name | Type | Description |
---|---|---|
context | Context | |
callback | Function |
Ensures context validity
this can be overridden by user
in sync or async way
in sync way it has signaturefunction(context):Error
so it must return error if context is invalid
sync signature
Stage.prototype.ensure = function(context, callback) {
var self = this;
var validation = self.validate(context);
if (validation) {
if ('boolean' === typeof validation) {
callback(null);
} else {
callback(validation);
}
} else {
callback(new Error(self.reportName() + ' reports: Context is invalid'));
}
};
internal storage for name
Stage.prototype.name = undefined;
Option name | Type | Description |
---|---|---|
validatee | Context |
default validate
implementation
Stage.prototype.validate = function(context) {
return true;
};
Option name | Type | Description |
---|---|---|
Error|null | r | |
Context | ontex | |
Function | callback | callback for async rescue process return Error|undefined|null it can be also sync like this
|
Allpurpose Error handler for stage
it must return na Error or undefined,
by default it renturns error but one can override it with some business ligic
Stage.prototype.rescue = function(err, context, callback) {
if (typeof callback === 'function') {
callback(err);
} else {
return err;
}
};
run function, can be assigned by child class
Singature
function(err, ctx, done) -- async with custom error handler!
NOTE: All errors must be handled withing the function.
function(ctx, done) -- async
function(ctx) -- sync call
function() -- sync callcontext
applyed as this for function.
Stage.prototype.run = 0;
var failproofSyncCall = require('./util.js').failproofSyncCall;
var failproofAsyncCall = require('./util.js').failproofAsyncCall;
Option name | Type | Description |
---|---|---|
Context|Object | contex | incoming context |
Function | allbac | incoming callback function function(err, ctx) |
executes stage and return result to callback
always async
Stage.prototype.execute = function(_context, callback) {
var self = this;
var context = Context.ensure(_context);
var hasCallback = typeof(callback) === 'function';
function handleError(_err) {
if(_err && !(_err instanceof Error)) {
if('string' === typeof _err)
_err = Error(_err);
}
function processError(err) {
if (err) {
if (self.listeners('error').length > 0) {
// поскольку код вызывается в домене,
// то без листенера код вызовет рекурсию...
self.emit('error', err, context);
}
}
finishIt(err);
}
var len = self.rescue.length;
switch (len) {
case 0:
processError(self.rescue());
break;
case 1:
processError(self.rescue(_err));
break;
case 2:
processError(self.rescue(_err, context));
break;
case 3:
self.rescue(_err, context, processError);
break;
default:
processError(_err);
}
}
var ensureAsync = failproofAsyncCall.bind(undefined, handleError);
var ensureSync = failproofSyncCall.bind(undefined, handleError);
function finishIt(err) {
self.emit('end', context);
if (hasCallback) {
// setImmediate(function(err, context) {
callback(err, context);
// }, err, context);
}
}
//wrap it with errorcheck
var doneIt = function(err) {
if (err) {
handleError(err);
} else {
self.emit('done', context);
finishIt();
}
};
// error handler
runStage = function(err) {
if (typeof(self.run) == 'function') {
if (err) {
if (self.run.length >= 3) {
ensureAsync(self, self.run)(err, context, doneIt);
} else {
handleError(err);
}
} else {
var hasError = null;
switch (self.run.length) {
case 0:
ensureSync(context, self.run, doneIt)();
break;
case 1:
ensureSync(self, self.run, doneIt)(context);
break;
case 2:
ensureAsync(self, self.run)(context, doneIt);
break;
case 3:
ensureAsync(self, self.run)(null, context, doneIt);
break;
default:
handleError(new Error('unacceptable signature'));
}
}
} else {
handleError(new Error(self.reportName() + ' reports: run is not a function'));
}
};
switch (self.ensure.length) {
case 2:
self.ensure(context, runStage);
break;
case 1:
runStage(self.ensure(context));
break;
default:
handleError(new Error('unknown ensure signature'));
}
};