Multi-threaded workers processing
schema.org Action concurrently.
Note: this module is auto published to npm on CircleCI. Only run npm version patch|minor|major and let CI do the rest.
Workers work with schema.org Action. Readers not
familiar with Action should refer
to
schema.org Actions overview document for
a quick introduction.
For an API endpoint receiving actions like:
{
"@context": "http://schema.org",
"@id-input": { "@type": "PropertyValueSpecification", "valueRequired": true },
"@type-input": { "@type": "PropertyValueSpecification", "valueRequired": true },
"actionStatus": "PotentialActionStatus",
"agent-input": { "@type": "PropertyValueSpecification", "valueRequired": true },
"object-input": {
"@type": "PropertyValueSpecification",
"valueRequired": true,
"valueName": "objectId"
},
"result": {
"@id-output": {
"@type": "PropertyValueSpecification",
"valueRequired": true,
"valueName": "resultId"
},
"@type": "UpdateAction"
},
"target": {
"@type": "EntryPoint",
"httpMethod": "PUT",
"urlTemplate": "http://example.com/{objectId}",
"encodingType": "application/ld+json",
"contentType": "application/ld+json"
}
}@scipe/workers provides everything required to create scalable action
processing pipelines supporting cancellation and real time progress events.
@scipe/workers provides a base Worker class. Workers implementors
must extend this base class with:
- a
handleActionmethod (required) - a
handleExitmethod (optional) - life cycles methods (
onActiveActionStatus,onCompletedActionStatus,onFailedActionStatus) (optional).
import { Worker } from '@scipe/workers';
class CustomWorker extends Worker {
constructor(config) {
super(config);
}
handleAction(action, callback) {
// Do work
callback(err, handledAction, nextAction);
}
handleExit(err) {
// err is an error in case of crash or a status code in case of clean exit
// Do cleanup things like killing child processes
}
onActiveActionStatus(action, callback) {
// Called before the worker starts to emit the first
// ActiveActionStatus message. Calling the callback with an error will
// abort the work.
}
onCompletedActionStatus(handledAction, callback) {
// Called if handleAction succesfully completed and before emitting
// CompletedActionStatus message. Calling the callback with an error will
// call onFailedActionStatus (passing the error and the handledAction).
}
onCanceledAction(action, callback) {
// Called when the user issue a `CancelAction` targetting `action`
// Calling the callback with an error will abort the cancellation
}
onFailedActionStatus(err, action, callback) {
// Called if handleAction or onCompletedActionStatus failed
// Calling the callback with and error with a negative
// property will trigger the suicide of the worker. After suicide, a new
// worker node will be automatically respawned.
}
}Workers are spawned (using Node.js cluster module) and expose ZeroMQ sockets so that:
- work (action) can be dispatched to the workers.
- workers can notify their progress.
- ongoing work (action) can be canceled.
If the handleAction method calls its completion callback with a
nextAction argument, the next actions will be automatically
dispatched.
Errors should be instances of
Error.
Errors may have a code property.
Errors with a code < 0 triggers the suicide of the current worker. After suicide, a new worker node will be automatically respawned.
worker:
import { Worker } from '@scipe/workers';
class CustomWorker extends Worker {
constructor(config) {
super(config);
}
handleAction(action, callback) {
callback(err, processedAction, nextAction);
}
handleExit(err) {
}
}
let w = new CustomWorker({nWorkers: 1});
w.listen();
w.stop(() => {
//stopped
});client:
import { Worker } from '@scipe/workers';
let w = new Worker();
w.dispatch({
'@context': 'http://schema.org',
'@id': 'http://example.com/actionId',
'@type': 'Action',
agent: 'http://example.com/agentId',
object: 'http://example.com/objectId',
result: {
'@id-outptut': {
'@type': 'PropertyValueSpecification',
valueRequired: true,
valueName: 'resultId'
}
},
target: {
'@type': 'EntryPoint',
httpMethod: 'PUT',
urlTemplate: 'http://example.com/{resultId}',
encodingType: 'application/ld+json',
contentType: 'application/ld+json'
}
}, (err) => {
// the worker acknowledge the dispatch as soon as the action is received by the worker
});A broker is needed so that the client can reach the worker. The broker will also ensure proper balancing of the load among the multiple connected workers (using a least recently used strategy).
Broker:
import { Broker } from '@scipe/workers';
const broker = new Broker();
broker.listen(err => {
if (err) {
throw err;
}
});
broker.on('change', (data) => {
console.log(data);
})The broker is an EventEmitter and emit change event that can be tracked to
know:
- the number of pending requests
- the number of available workers (in READY state).
This data can be used to auto-scale the workers based on work load.
Cancellation (CancelAction)
Workers subscribe to a ZeroMQ SUB socket and messages can be sent to this socket to administrate the workers.
In particular, work related to a given action can be canceled by sending
a CancelAction
whose object is the action @id to the
worker zeromq under the worker topic to the pub socket.
import zmq from 'zmq';
const pub = zmq.socket('push');
const topic = 'worker';
const cancelAction = {
'@type': CancelAction,
actionStatus: 'CompletedActionStatus',
object: 'scipe:actionId'
}
pub.connect(w.PULL_ENDPOINT);
pub.send([topic, JSON.stringify(cancelAction)]);Workers publish the status of their work through a ZeroMQ SUB socket.
import zmq from 'zmq';
let sub = zmq.socket('sub');
sub.connect(w.XPUB_ENDPOINT);
sub.subscribe('');
sub.on('message', function(topic, action) {
// topic is the action agent['@id']
// note that topic and action are Buffers
});When a worker starts (and while the job is running), it will re-emit
the action sent at a regular interval with an
actionStatus of
ActiveActionStatus.
If a user cancel a job, the worker will emit emit the original action with an
actionStatus of
CanceledActionStatus.
If a worker fails, it will emit emit the original action with an
actionStatus of
FailedActionStatus and
an error property containing more
information on the cause of the failure.
When a worker is done processing an action, it will emit the
handledAction returned by the handleAction method usually with an
actionStatus of
CompletedActionStatus .
Within a worker, further information can be published to
the ZeroMQ PUB socket by calling the emitEvent(action, event) method. Calling emitEvent will publish
a ProgressEvent to the PUB socket. The
topic (required by ZeroMQ) will be set to the action agent @id.
{
"@context": "http://schema.org",
"@id": "scipe:eventId",
"@type": "Event",
"about": "scipe:actionId",
"description": "starting to process the action",
"startDate": "2016-02-29T16:21:32.886Z"
}In addition to publishing the ProgressEvent, the emitEvent method
returns an object with:
emitEndedEvent, a function returning the sameProgressEventas the one emitted the previous call but, with an addedendDateproperty.emitEvent, returning a newProgressEventlinked to the previous event through the superEvent property.toJSON, function returning the emittedProgressEventJavaScript object (note that this function will be called by JSON.stringify).
import { Worker } from '@scipe/workers';
class CustomWorker extends Worker {
handleAction(action, callback) {
const superEvent = this.emitEvent(action, 'starting to process the action');
const imageConversionEvent = superEvent.emitEvent('starting image conversion');
// convert images...
imageConversionEvent.emitEndedEvent();
superEvent.emitEndedEvent();
callback(err, handledAction, nextAction);
}
}Workers can be configured by passing a config object to their
constructor (see worker source code for details).
import { ImageWorker, AudioVideoWorker, DocumentWorker } from '@scipe/workers';The ImageWorker class extends the Worker class and
process Action
whose object
are ImageObject.
The AudioVideoWorker class extends the Worker class and
process Action
whose object
are VideoObject
or AudioObject.
The DocumentWorker class extends the Worker class and
processes Actions
the object of which are DocumentObject, a
subclass of MediaObject.
A CLI is available to quickly launch a broker and all the specialized worker.
See
run-workers --helpFor more details
-
Install graphicsmagick (
brew install graphicsmagick --with-libtiffon OSX). -
Install imagemagick (
brew install imagemagick --with-libtiffon OSX). -
Install ffmpeg (
brew install ffmpeg --with-libvpx --with-libvorbis --with-theora --with-aac --with-libx264on OSX). -
Install LibreOffice (it needs to be used headless)
-
Run
npm install
Run npm test
@scipe/workers is dual-licensed under commercial and open source licenses
(AGPLv3) based on the intended
use case. Contact us to learn which license applies to your use case.
