Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ lib-cov

# Coverage directory used by tools like istanbul
coverage
.nyc_output

# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files)
.grunt
Expand Down
39 changes: 27 additions & 12 deletions lib/bigcommerce.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ const isIp = require('is-ip');

const logger = require('debug')('node-bigcommerce:bigcommerce'),
crypto = require('crypto'),
Request = require('./request');
Request = require('./request'),
{ Semaphore, TokenBucket } = require('./limiter');

class BigCommerce {
constructor(config) {
Expand All @@ -37,6 +38,12 @@ class BigCommerce {

this.config = config;
this.apiVersion = this.config.apiVersion || 'v2';

if (config.limiter) {
const { maxConcurrent, requestsPerSecond } = config.limiter;
this._semaphore = maxConcurrent ? new Semaphore(maxConcurrent) : null;
this._tokenBucket = requestsPerSecond ? new TokenBucket(requestsPerSecond) : null;
}
}

/** Verify legacy signed_payload (can be ignored in favor of JWT) **/
Expand Down Expand Up @@ -187,7 +194,8 @@ class BigCommerce {
'X-Auth-Token': this.config.accessToken
}, this.config.headers || {}),
failOnLimitReached: this.config.failOnLimitReached,
agent: this.config.agent
agent: this.config.agent,
maxRetries: this.config.maxRetries
});
}

Expand All @@ -199,19 +207,26 @@ class BigCommerce {
);
}

const extension = this.config.responseType === 'xml' ? '.xml' : '';
const version = this.apiVersion;
if (this._tokenBucket) await this._tokenBucket.consume();
if (this._semaphore) await this._semaphore.acquire();
Comment on lines +210 to +211
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Consume rate tokens after semaphore acquisition

request() currently calls this._tokenBucket.consume() before waiting on this._semaphore.acquire(), so when both limiter options are enabled, requests can burn tokens while queued on concurrency and then burst once a slot opens. In practice, with maxConcurrent=1 and requestsPerSecond=1, one long request followed by several short ones can start multiple queued calls within the same second, violating the configured outbound rate and causing avoidable 429s. Acquire the semaphore first (or re-check token availability immediately before dispatch) so pacing reflects actual send time.

Useful? React with 👍 / 👎.


const request = this.createAPIRequest();
try {
const extension = this.config.responseType === 'xml' ? '.xml' : '';
const version = this.apiVersion;

let fullPath = `/stores/${this.config.storeHash}/${version}`;
if (version !== 'v3') {
fullPath += path.replace(/(\?|$)/, extension + '$1');
} else {
fullPath += path;
}
const request = this.createAPIRequest();

let fullPath = `/stores/${this.config.storeHash}/${version}`;
if (version !== 'v3') {
fullPath += path.replace(/(\?|$)/, extension + '$1');
} else {
fullPath += path;
}

return await request.run(type, fullPath, data);
return await request.run(type, fullPath, data);
} finally {
if (this._semaphore) this._semaphore.release();
}
}

getTime() {
Expand Down
67 changes: 67 additions & 0 deletions lib/limiter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
'use strict';

/**
* Rate limiter primitives for controlling API request concurrency and throughput.
*/

/**
* Semaphore limits the number of concurrent in-flight operations.
*/
class Semaphore {
constructor(limit) {
this.limit = limit;
this.running = 0;
this.queue = [];
}

acquire() {
if (this.running < this.limit) {
this.running++;
return Promise.resolve();
}
return new Promise(resolve => {
this.queue.push(() => {
this.running++;
resolve();
});
});
}

release() {
this.running--;
const next = this.queue.shift();
if (next) next();
}
}

/**
* TokenBucket limits the rate of operations to tokensPerSecond.
* Starts full, allowing an initial burst up to capacity.
*/
class TokenBucket {
constructor(tokensPerSecond) {
this.tokensPerSecond = tokensPerSecond;
this.tokens = tokensPerSecond;
this.lastRefill = Date.now();
}

async consume() {
this._refill();
if (this.tokens >= 1) {
this.tokens -= 1;
return;
}
const waitMs = 1000 / this.tokensPerSecond;
await new Promise(resolve => setTimeout(resolve, waitMs));
return this.consume();
}

_refill() {
const now = Date.now();
const elapsed = (now - this.lastRefill) / 1000;
this.tokens = Math.min(this.tokensPerSecond, this.tokens + (elapsed * this.tokensPerSecond));
this.lastRefill = now;
}
}

module.exports = { Semaphore, TokenBucket };
27 changes: 22 additions & 5 deletions lib/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,22 @@ function parseResponse(res, body, resolve, reject) {
}

class Request {
constructor(hostname, { headers = { }, failOnLimitReached = false, agent = null } = { }) {
constructor(hostname, {
headers = { },
failOnLimitReached = false,
agent = null,
maxRetries = 10
} = { }) {
if (!hostname) throw new Error('The hostname is required to make the call to the server.');

this.hostname = hostname;
this.headers = headers;
this.failOnLimitReached = failOnLimitReached;
this.agent = agent;
this.maxRetries = maxRetries;
}

run(method, path, data) {
run(method, path, data, _attempt = 0) {
logger(`Requesting Data from: https://${this.hostname}${path} Using the ${method} method`);

const dataString = JSON.stringify(data);
Expand Down Expand Up @@ -92,15 +98,26 @@ class Request {
return reject(err);
}

logger(`You have reached the rate limit for the BigCommerce API, we will retry again in ${timeToWait} seconds.`);
if (_attempt >= this.maxRetries) {
const err = new Error(`Rate limit reached: retries exhausted after ${this.maxRetries} attempts. Please retry in ${timeToWait} seconds.`);
err.retryAfter = Number(timeToWait);
err.code = 429;

return reject(err);
}

const jitter = Math.floor(Math.random() * 500);
const delay = (timeToWait * 1000) + jitter;

logger(`You have reached the rate limit for the BigCommerce API, we will retry again in ${timeToWait} seconds (attempt ${_attempt + 1}/${this.maxRetries}).`);

return setTimeout(() => {
logger('Restarting request call after suggested time');

this.run(method, path, data)
this.run(method, path, data, _attempt + 1)
.then(resolve)
.catch(reject);
}, timeToWait * 1000);
}, delay);
}

res.on('data', chunk => body += chunk);
Expand Down
86 changes: 86 additions & 0 deletions test/bigcommerce.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,32 @@ describe('BigCommerce', () => {
it('should set api version to a default', () => {
new BigCommerce({ apiVersion: 'v3' }).apiVersion.should.equal('v3');
});

context('given limiter config', () => {
it('should create a semaphore when maxConcurrent is provided', () => {
const limitedBc = new BigCommerce({ limiter: { maxConcurrent: 3 } });
should.exist(limitedBc._semaphore);
limitedBc._semaphore.limit.should.equal(3);
});

it('should create a token bucket when requestsPerSecond is provided', () => {
const limitedBc = new BigCommerce({ limiter: { requestsPerSecond: 5 } });
should.exist(limitedBc._tokenBucket);
limitedBc._tokenBucket.tokensPerSecond.should.equal(5);
});

it('should create both gates when both options are provided', () => {
const limitedBc = new BigCommerce({ limiter: { maxConcurrent: 5, requestsPerSecond: 10 } });
should.exist(limitedBc._semaphore);
should.exist(limitedBc._tokenBucket);
});

it('should not create any gates when limiter is omitted', () => {
const plainBc = new BigCommerce({ test: true });
should.not.exist(plainBc._semaphore);
should.not.exist(plainBc._tokenBucket);
});
});
});

describe('#verify', () => {
Expand Down Expand Up @@ -348,4 +374,64 @@ describe('BigCommerce', () => {
});
});
});

describe('#request with limiter', () => {
beforeEach(() => {
self.requestStub = self.sandbox.stub(Request.prototype, 'run')
.returns(Promise.resolve({ ok: true }));
});

it('should pass maxRetries config through to the Request via createAPIRequest', () => {
const bcWithRetries = new BigCommerce({
accessToken: '123456',
storeHash: '12abc',
maxRetries: 5
});
bcWithRetries.config.maxRetries.should.equal(5);
const req = bcWithRetries.createAPIRequest();
req.maxRetries.should.equal(5);
});

it('should acquire and release the semaphore on each request', () => {
const pacedBc = new BigCommerce({
accessToken: '123456',
storeHash: '12abc',
limiter: { maxConcurrent: 2 }
});

const acquireSpy = self.sandbox.spy(pacedBc._semaphore, 'acquire');
const releaseSpy = self.sandbox.spy(pacedBc._semaphore, 'release');

return pacedBc.get('/products').then(() => {
sinon.assert.calledOnce(acquireSpy);
sinon.assert.calledOnce(releaseSpy);
});
});

it('should release the semaphore even when the request fails', () => {
const pacedBc = new BigCommerce({
accessToken: '123456',
storeHash: '12abc',
limiter: { maxConcurrent: 2 }
});

self.requestStub.returns(Promise.reject(new Error('network error')));
const releaseSpy = self.sandbox.spy(pacedBc._semaphore, 'release');

return pacedBc.get('/products')
.catch(() => {
sinon.assert.calledOnce(releaseSpy);
});
});

it('should make requests normally without pacing config', () => {
const normalBc = new BigCommerce({
accessToken: '123456',
storeHash: '12abc'
});
return normalBc.get('/products').then(res => {
res.should.deep.equal({ ok: true });
});
});
});
});
Loading