Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,49 @@ await mutex.waitForUnlock();
// ...
```

### Cancelling pending unlocks

Pending unlocks, which are the calls to `mutex.waitForUnlock()` can be cancelled by calling `cancelUnlockWaiters()` on the mutex. This will reject
all pending unlocks with `E_UNLOCKWAITERS_CANCELED`:

Promise style:
```typescript
import {E_UNLOCKWAITERS_CANCELED} from 'async-mutex';

mutex
.waitForUnlock()
.then(() => {
// ...
})
.catch(e => {
if (e === E_UNLOCKWAITERS_CANCELED) {
// ...
}
});
```

async/await:
```typescript
import {E_UNLOCKWAITERS_CANCELED} from 'async-mutex';

try {
await mutex.waitForUnlock();
// ...
} catch (e) {
if (e === E_UNLOCKWAITERS_CANCELED) {
// ...
}
}
```

The error that is thrown can be customized by passing a different error as the second argument to the `Mutex`
constructor:

```typescript
import {E_CANCELED} from 'async-mutex';

const mutex = new Mutex(E_CANCELED, new Error('fancy custom error'));
```

## Semaphore API

Expand Down Expand Up @@ -457,6 +500,50 @@ as it is possible to `acquire` the semaphore with the given weight and priority.
the greatest `priority` values execute first.


### Cancelling pending unlocks

Pending unlocks, which are the calls to `semaphore.waitForUnlock()` can be cancelled by calling `cancelUnlockWaiters()` on the semaphore. This will reject
all pending unlocks with `E_UNLOCKWAITERS_CANCELED`:

Promise style:
```typescript
import {E_UNLOCKWAITERS_CANCELED} from 'async-mutex';

semaphore
.waitForUnlock()
.then(() => {
// ...
})
.catch(e => {
if (e === E_UNLOCKWAITERS_CANCELED) {
// ...
}
});
```

async/await:
```typescript
import {E_UNLOCKWAITERS_CANCELED} from 'async-mutex';

try {
await semaphore.waitForUnlock();
// ...
} catch (e) {
if (e === E_UNLOCKWAITERS_CANCELED) {
// ...
}
}
```

The error that is thrown can be customized by passing a different error as the second argument to the `Semaphore`
constructor:

```typescript
import {E_CANCELED} from 'async-mutex';

const semaphore = new Semaphore(2, E_CANCELED, new Error('fancy custom error'));
```

## Limiting the time waiting for a mutex or semaphore to become available

Sometimes it is desirable to limit the time a program waits for a mutex or
Expand Down
8 changes: 6 additions & 2 deletions src/Mutex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import MutexInterface from './MutexInterface';
import Semaphore from './Semaphore';

class Mutex implements MutexInterface {
constructor(cancelError?: Error) {
this._semaphore = new Semaphore(1, cancelError);
constructor(cancelError?: Error, unlockCancelError?: Error) {
this._semaphore = new Semaphore(1, cancelError, unlockCancelError);
}

async acquire(priority = 0): Promise<MutexInterface.Releaser> {
Expand Down Expand Up @@ -32,6 +32,10 @@ class Mutex implements MutexInterface {
return this._semaphore.cancel();
}

cancelUnlockWaiters(): void {
return this._semaphore.cancelUnlockWaiters();
}

private _semaphore: Semaphore;
}

Expand Down
2 changes: 2 additions & 0 deletions src/MutexInterface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ interface MutexInterface {
release(): void;

cancel(): void;

cancelUnlockWaiters(): void;
}

namespace MutexInterface {
Expand Down
28 changes: 24 additions & 4 deletions src/Semaphore.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { E_CANCELED } from './errors';
import { E_CANCELED, E_UNLOCKWAITERS_CANCELED } from './errors';
import SemaphoreInterface from './SemaphoreInterface';


Expand All @@ -15,11 +15,12 @@ interface QueueEntry {

interface Waiter {
resolve(): void;
reject(error: unknown): void;
priority: number;
}

class Semaphore implements SemaphoreInterface {
constructor(private _value: number, private _cancelError: Error = E_CANCELED) {}
constructor(private _value: number, private _cancelError: Error = E_CANCELED, private _unlockCancelError: Error = E_UNLOCKWAITERS_CANCELED) { }

acquire(weight = 1, priority = 0): Promise<[number, SemaphoreInterface.Releaser]> {
if (weight <= 0) throw new Error(`invalid weight ${weight}: must be positive`);
Expand Down Expand Up @@ -52,9 +53,9 @@ class Semaphore implements SemaphoreInterface {
if (this._couldLockImmediately(weight, priority)) {
return Promise.resolve();
} else {
return new Promise((resolve) => {
return new Promise((resolve, reject) => {
if (!this._weightedWaiters[weight - 1]) this._weightedWaiters[weight - 1] = [];
insertSorted(this._weightedWaiters[weight - 1], { resolve, priority });
insertSorted(this._weightedWaiters[weight - 1], { resolve, reject, priority });
});
}
}
Expand Down Expand Up @@ -84,6 +85,11 @@ class Semaphore implements SemaphoreInterface {
this._queue = [];
}

cancelUnlockWaiters(): void {
this._weightedWaiters.forEach(waiters => waiters.forEach(waiter => waiter.reject(this._unlockCancelError)));
this._weightedWaiters = [];
}

private _dispatchQueue(): void {
this._drainUnlockWaiters();
while (this._queue.length > 0 && this._queue[0].weight <= this._value) {
Expand Down Expand Up @@ -134,7 +140,17 @@ class Semaphore implements SemaphoreInterface {
weight <= this._value;
}

/**
* `_queue` is sorted in descending order by the `priority` value passed to each lock call.
*/

private _queue: Array<QueueEntry> = [];

/**
* `_weightedWaiters` is sorted in ascending order by the `weight` argument of each `waitForUnlock(weight, priority)` call.
*
* Each element of `_weightedWaiters` contains a list of "unlock waiters", which is sorted in descending order by the `priority` argument of each `waitForUnlock` call.
*/
private _weightedWaiters: Array<Array<Waiter>> = [];
}

Expand All @@ -143,6 +159,10 @@ function insertSorted<T extends Priority>(a: T[], v: T) {
a.splice(i + 1, 0, v);
}

/**
* Finds the index from the end of the list based on `predicate`, the found index is used to inset an item to `_queue` or `_weightedWaiters`.
*/

function findIndexFromEnd<T>(a: T[], predicate: (e: T) => boolean): number {
for (let i = a.length - 1; i >= 0; i--) {
if (predicate(a[i])) {
Expand Down
2 changes: 2 additions & 0 deletions src/SemaphoreInterface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ interface SemaphoreInterface {
release(weight?: number): void;

cancel(): void;

cancelUnlockWaiters(): void;
}

namespace SemaphoreInterface {
Expand Down
1 change: 1 addition & 0 deletions src/errors.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export const E_TIMEOUT = new Error('timeout while waiting for mutex to become available');
export const E_ALREADY_LOCKED = new Error('mutex already locked');
export const E_CANCELED = new Error('request for lock canceled');
export const E_UNLOCKWAITERS_CANCELED = new Error('request for unlock canceled');
23 changes: 16 additions & 7 deletions src/withTimeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout:
return sync.cancel();
},

cancelUnlockWaiters(): void {
return sync.cancelUnlockWaiters();
},

waitForUnlock: (weightOrPriority?: number, priority?: number): Promise<void> => {
let weight: number | undefined;
if (isSemaphore(sync)) {
Expand All @@ -90,15 +94,20 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout:
throw new Error(`invalid weight ${weight}: must be positive`);
}

return new Promise((resolve, reject) => {
return new Promise(async (resolve, reject) => {
const handle = setTimeout(() => reject(timeoutError), timeout);
(isSemaphore(sync)
? sync.waitForUnlock(weight, priority)
: sync.waitForUnlock(priority)
).then(() => {
clearTimeout(handle);

try {
await (isSemaphore(sync)
? sync.waitForUnlock(weight, priority)
: sync.waitForUnlock(priority)
);
resolve();
});
} catch (e) {
reject(e);
} finally {
clearTimeout(handle);
}
});
},

Expand Down
33 changes: 29 additions & 4 deletions test/mutex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ import * as assert from 'assert';

import { InstalledClock, install } from '@sinonjs/fake-timers';

import { E_CANCELED } from '../src/errors';
import { E_CANCELED, E_UNLOCKWAITERS_CANCELED } from '../src/errors';
import Mutex from '../src/Mutex';
import MutexInterface from '../src/MutexInterface';
import { withTimer } from './util';

export const mutexSuite = (factory: (cancelError?: Error) => MutexInterface): void => {
export const mutexSuite = (factory: (cancelError?: Error, unlockCancelError?: Error) => MutexInterface): void => {
let mutex: MutexInterface;
let clock: InstalledClock;

Expand Down Expand Up @@ -191,7 +191,7 @@ export const mutexSuite = (factory: (cancelError?: Error) => MutexInterface): vo
assert.strictEqual(v, 3);
});

test('cancel rejects all pending locks witth E_CANCELED', async () => {
test('cancel rejects all pending locks with E_CANCELED', async () => {
await mutex.acquire();

const ticket = mutex.acquire();
Expand Down Expand Up @@ -321,6 +321,31 @@ export const mutexSuite = (factory: (cancelError?: Error) => MutexInterface): vo
await clock.tickAsync(0);
assert.strictEqual(flag, false);
});

test('cancelUnlockWaiters rejects all pending unlockWaiters with E_UNLOCKWAITERS_CANCELED', async () => {
await mutex.acquire();

const res1 = mutex.waitForUnlock();
const res2 = mutex.waitForUnlock();

mutex.cancelUnlockWaiters();

await assert.rejects(res1, E_UNLOCKWAITERS_CANCELED);
await assert.rejects(res2, E_UNLOCKWAITERS_CANCELED);
});

test('cancelUnlockWaiters rejects with a custom error if provided', async () => {
const err = new Error();
const mutex = factory(E_CANCELED, err);

await mutex.acquire();

const res1 = mutex.waitForUnlock();

mutex.cancelUnlockWaiters();

await assert.rejects(res1, err);
});
};

suite('Mutex', () => mutexSuite((e) => new Mutex(e)));
suite('Mutex', () => mutexSuite((cancelError, unlockCancelError) => new Mutex(cancelError, unlockCancelError)));
2 changes: 1 addition & 1 deletion test/semaphore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ import Semaphore from '../src/Semaphore';
import { semaphoreSuite } from './semaphoreSuite';

suite('Semaphore', () => {
semaphoreSuite((maxConcurrency: number, err?: Error) => new Semaphore(maxConcurrency, err));
semaphoreSuite((maxConcurrency: number, cancelError?: Error, unlockCancelError?: Error) => new Semaphore(maxConcurrency, cancelError, unlockCancelError));
});
43 changes: 40 additions & 3 deletions test/semaphoreSuite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ import * as assert from 'assert';

import { InstalledClock, install } from '@sinonjs/fake-timers';

import { E_CANCELED } from '../src/errors';
import { E_CANCELED, E_UNLOCKWAITERS_CANCELED } from '../src/errors';
import SemaphoreInterface from '../src/SemaphoreInterface';
import { withTimer } from './util';

export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) => SemaphoreInterface): void => {
export const semaphoreSuite = (factory: (maxConcurrency: number, cancelError?: Error, unlockCancelError?: Error) => SemaphoreInterface): void => {
let semaphore: SemaphoreInterface;
let clock: InstalledClock;

Expand Down Expand Up @@ -119,7 +119,7 @@ export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) =>
let done = false;
async function lightLoop() {
while (!done) {
const [,release] = await semaphore.acquire(1);
const [, release] = await semaphore.acquire(1);
await new Promise((resolve) => { setTimeout(resolve, 10); });
release();
}
Expand Down Expand Up @@ -596,4 +596,41 @@ export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) =>
test('trying to waitForUnlock with a negative weight throws', () => {
assert.throws(() => semaphore.waitForUnlock(-1));
});

test('cancelUnlockWaiters rejects all pending unlockWaiters with E_UNLOCKWAITERS_CANCELED', async () => {
await semaphore.acquire();
await semaphore.acquire();

const res1 = semaphore.waitForUnlock();
const res2 = semaphore.waitForUnlock();

semaphore.cancelUnlockWaiters();

await assert.rejects(res1, E_UNLOCKWAITERS_CANCELED);
await assert.rejects(res2, E_UNLOCKWAITERS_CANCELED);
});

test('cancelUnlockWaiters rejects with a custom error if provided', async () => {
const err = new Error();
const semaphore = factory(2, E_CANCELED, err);

await semaphore.acquire();
await semaphore.acquire();

const res1 = semaphore.waitForUnlock();

semaphore.cancelUnlockWaiters();

await assert.rejects(res1, err);
});

test('cancelUnlockWaiters works fine with isolated weights', async () => {
const res1 = semaphore.waitForUnlock(3);
const res2 = semaphore.waitForUnlock(5);

semaphore.cancelUnlockWaiters();

await assert.rejects(res1, E_UNLOCKWAITERS_CANCELED);
await assert.rejects(res2, E_UNLOCKWAITERS_CANCELED);
});
};
Loading