Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 75 additions & 30 deletions Models/Queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,13 @@ export class Queue {
data: JSON.stringify({
attempts: options.attempts || 1
}),
priority: options.priority || 0,
priority: Number.isInteger(options.priority) ? options.priority : 0,
active: false,
timeout: (options.timeout >= 0) ? options.timeout : 25000,
created: new Date(),
failed: null
failed: null,
nextValidTime: new Date(),
retryDelay: Number.isInteger(options.retryDelay) ? options.retryDelay : 0,
});

});
Expand All @@ -117,6 +119,20 @@ export class Queue {

}

calculateRemainingLifespan () {
const lifespanRemaining = this.lifespan - (Date.now() - this.startTime);
return (lifespanRemaining === 0) ? -1 : lifespanRemaining; // Handle exactly zero lifespan remaining edge case.
}

async calculateJobs (jobsLimit) {
if (this.lifespan !== 0) {
return this.getConcurrentJobs(jobsLimit,this.calculateRemainingLifespan());
} else {
return this.getConcurrentJobs(jobsLimit);
}

}

/**
*
* Start processing the queue.
Expand All @@ -140,7 +156,9 @@ export class Queue {
* @param lifespan {number} - If lifespan is passed, the queue will start up and run for lifespan ms, then queue will be stopped.
* @return {boolean|undefined} - False if queue is already started. Otherwise nothing is returned when queue finishes processing.
*/
async start(lifespan = 0) {
async start(lifespan = 0, numberOfJobsToProcess) {
this.lifespan = lifespan;
let jobsProcessed = 0;

// If queue is already running, don't fire up concurrent loop.
if (this.status == 'active') {
Expand All @@ -150,42 +168,33 @@ export class Queue {
this.status = 'active';

// Get jobs to process
const startTime = Date.now();
let lifespanRemaining = null;
let concurrentJobs = [];
if(!this.startTime || this.calculateRemainingLifespan() < 0)
this.startTime = Date.now();

if (lifespan !== 0) {
lifespanRemaining = lifespan - (Date.now() - startTime);
lifespanRemaining = (lifespanRemaining === 0) ? -1 : lifespanRemaining; // Handle exactly zero lifespan remaining edge case.
concurrentJobs = await this.getConcurrentJobs(lifespanRemaining);
} else {
concurrentJobs = await this.getConcurrentJobs();
}
let concurrentJobs;
concurrentJobs = await this.calculateJobs(numberOfJobsToProcess-jobsProcessed);

while (this.status == 'active' && concurrentJobs.length) {

// Loop over jobs and process them concurrently.
const processingJobs = concurrentJobs.map( job => {
return this.processJob(job);
});
jobsProcessed += concurrentJobs.length;

// Promise Reflect ensures all processingJobs resolve so
// we don't break await early if one of the jobs fails.
await Promise.all(processingJobs.map(promiseReflect));

// Get next batch of jobs.
if (lifespan !== 0) {
lifespanRemaining = lifespan - (Date.now() - startTime);
lifespanRemaining = (lifespanRemaining === 0) ? -1 : lifespanRemaining; // Handle exactly zero lifespan remaining edge case.
concurrentJobs = await this.getConcurrentJobs(lifespanRemaining);
} else {
concurrentJobs = await this.getConcurrentJobs();
}

concurrentJobs = await this.calculateJobs(numberOfJobsToProcess - jobsProcessed);
}

this.status = 'inactive';

if(this.calculateRemainingLifespan() < 500){
delete this.startTime;
delete this.lifespan;
}
}

/**
Expand All @@ -198,6 +207,8 @@ export class Queue {
*/
stop() {
this.status = 'inactive';
delete this.startTime;
delete this.lifespan;
}

/**
Expand Down Expand Up @@ -240,25 +251,28 @@ export class Queue {
* @param queueLifespanRemaining {number} - The remaining lifespan of the current queue process (defaults to indefinite).
* @return {promise} - Promise resolves to an array of job(s) to be processed next by the queue.
*/
async getConcurrentJobs(queueLifespanRemaining = 0) {
async getConcurrentJobs(jobsLimit = -1, queueLifespanRemaining = 0) {

let concurrentJobs = [];

this.realm.write(() => {

// Get next job from queue.
let nextJob = null;
const now = new Date();

// Build query string
// If queueLife
const timeoutUpperBound = (queueLifespanRemaining - 500 > 0) ? queueLifespanRemaining - 499 : 0; // Only get jobs with timeout at least 500ms < queueLifespanRemaining.

const initialQuery = (queueLifespanRemaining)
? 'active == FALSE AND failed == null AND timeout > 0 AND timeout < ' + timeoutUpperBound
: 'active == FALSE AND failed == null';
? 'active == FALSE AND failed == null AND timeout > 0 AND timeout < ' + timeoutUpperBound + ' AND nextValidTime <= $0'
: 'active == FALSE AND failed == null AND nextValidTime <= $0';

const limitQuery = jobsLimit > -1 ? ` LIMIT(${jobsLimit})` : '';

let jobs = this.realm.objects('Job')
.filtered(initialQuery)
.filtered(initialQuery + limitQuery, now)
.sorted([['priority', true], ['created', false]]);

if (jobs.length) {
Expand All @@ -271,11 +285,11 @@ export class Queue {
const concurrency = this.worker.getConcurrency(nextJob.name);

const allRelatedJobsQuery = (queueLifespanRemaining)
? 'name == "'+ nextJob.name +'" AND active == FALSE AND failed == null AND timeout > 0 AND timeout < ' + timeoutUpperBound
: 'name == "'+ nextJob.name +'" AND active == FALSE AND failed == null';
? 'name == "'+ nextJob.name +'" AND active == FALSE AND failed == null AND timeout > 0 AND timeout < ' + timeoutUpperBound + ' AND nextValidTime <= $0'
: 'name == "'+ nextJob.name +'" AND active == FALSE AND failed == null AND nextValidTime <= $0';

const allRelatedJobs = this.realm.objects('Job')
.filtered(allRelatedJobsQuery)
.filtered(allRelatedJobsQuery + limitQuery, now)
.sorted([['priority', true], ['created', false]]);

let jobsToMarkActive = allRelatedJobs.slice(0, concurrency);
Expand All @@ -293,7 +307,7 @@ export class Queue {
// Reselect now-active concurrent jobs by id.
const reselectQuery = concurrentJobIds.map( jobId => 'id == "' + jobId + '"').join(' OR ');
const reselectedJobs = this.realm.objects('Job')
.filtered(reselectQuery)
.filtered(reselectQuery + limitQuery)
.sorted([['priority', true], ['created', false]]);

concurrentJobs = reselectedJobs.slice(0, concurrency);
Expand Down Expand Up @@ -379,8 +393,13 @@ export class Queue {
job.failed = new Date();
}

job.nextValidTime = new Date(new Date().getTime() + job.retryDelay);
});

if(job.retryDelay && job.retryDelay > 0) setTimeout(() => {
this.start(this.lifespan ? this.lifespan : 0);
},job.retryDelay);

// Execute job onFailure lifecycle callback.
this.worker.executeJobLifecycleCallback('onFailure', jobName, jobId, jobPayload);

Expand Down Expand Up @@ -428,6 +447,32 @@ export class Queue {

}

/**
* Delete a job in the queue with jobId
* @param jobId {string} - id associated with job
*/
flushJob(jobId) {
try {
if(jobId) {
this.realm.write(() => {
let jobs = this.realm
.objects('Job')
.filtered(`id == "${jobId}"`);
if(jobs.length) {
this.realm.delete(jobs);
return;
}
});
}
} catch (e) {
console.log('flushJob failed', jobId);
}
}

async close() {
await this.stop();
await this.realm.close();
}

}

Expand Down
6 changes: 4 additions & 2 deletions config/Database.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ const JobSchema = {
active: { type: 'bool', default: false}, // Whether or not job is currently being processed.
timeout: 'int', // Job timeout in ms. 0 means no timeout.
created: 'date', // Job creation timestamp.
failed: 'date?' // Job failure timestamp (null until failure).
failed: 'date?', // Job failure timestamp (null until failure).
nextValidTime: 'date?', // Next timestamp it would be valid to execute the job calculated from retry Delay at time of fail
retryDelay: 'int',
}
};

Expand Down Expand Up @@ -45,4 +47,4 @@ export default class Database {

}

}
}
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@
},
"homepage": "https://github.com/billmalarky/react-native-queue#readme",
"dependencies": {
"moment": "^2.24.0",
"promise-reflect": "^1.1.0",
"react-native-uuid": "^1.4.9",
"realm": "^2.0.12"
"realm": "^3.6.4"
},
"devDependencies": {
"babel-eslint": "^8.0.3",
Expand Down
Loading