From 4d303ac2771ba0823fd05d2e3f737987a71320ed Mon Sep 17 00:00:00 2001 From: alveifbklsiu259 Date: Sun, 16 Jun 2024 01:33:15 +0800 Subject: [PATCH 1/2] Add new feature to mutex and semaphore cancelUnlockWaiters: Cancel pending unlocks. --- README.md | 87 +++++++++++++++++++++++++++++++++++++++ src/Mutex.ts | 8 +++- src/MutexInterface.ts | 2 + src/Semaphore.ts | 14 +++++-- src/SemaphoreInterface.ts | 2 + src/errors.ts | 1 + src/withTimeout.ts | 23 +++++++---- test/mutex.ts | 33 +++++++++++++-- test/semaphore.ts | 2 +- test/semaphoreSuite.ts | 43 +++++++++++++++++-- test/withTimeout.ts | 8 ++-- 11 files changed, 198 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 95c30b8..48e1b7e 100644 --- a/README.md +++ b/README.md @@ -250,6 +250,49 @@ await mutex.waitForUnlock(); // ... ``` +### Cancelling pending unlocks + +Pending unlocks, which are the calls to `mutex.waitForUnlock()` can be cancelled by calling `cancelUnlockWaiters()` on the mutex. This will reject +all pending unlocks with `E_UNLOCKWAITERS_CANCELED`: + +Promise style: +```typescript +import {E_UNLOCKWAITERS_CANCELED} from 'async-mutex'; + +mutex + .waitForUnlock() + .then(() => { + // ... + }) + .catch(e => { + if (e === E_UNLOCKWAITERS_CANCELED) { + // ... + } + }); +``` + +async/await: +```typescript +import {E_UNLOCKWAITERS_CANCELED} from 'async-mutex'; + +try { + await mutex.waitForUnlock(); + // ... +} catch (e) { + if (e === E_UNLOCKWAITERS_CANCELED) { + // ... + } +} +``` + +The error that is thrown can be customized by passing a different error as the second argument to the `Mutex` +constructor: + +```typescript +import {E_CANCELED} from 'async-mutex'; + +const mutex = new Mutex(E_CANCELED, new Error('fancy custom error')); +``` ## Semaphore API @@ -457,6 +500,50 @@ as it is possible to `acquire` the semaphore with the given weight and priority. the greatest `priority` values execute first. +### Cancelling pending unlocks + +Pending unlocks, which are the calls to `semaphore.waitForUnlock()` can be cancelled by calling `cancelUnlockWaiters()` on the semaphore. This will reject +all pending unlocks with `E_UNLOCKWAITERS_CANCELED`: + +Promise style: +```typescript +import {E_UNLOCKWAITERS_CANCELED} from 'async-mutex'; + +semaphore + .waitForUnlock() + .then(() => { + // ... + }) + .catch(e => { + if (e === E_UNLOCKWAITERS_CANCELED) { + // ... + } + }); +``` + +async/await: +```typescript +import {E_UNLOCKWAITERS_CANCELED} from 'async-mutex'; + +try { + await semaphore.waitForUnlock(); + // ... +} catch (e) { + if (e === E_UNLOCKWAITERS_CANCELED) { + // ... + } +} +``` + +The error that is thrown can be customized by passing a different error as the second argument to the `Semaphore` +constructor: + +```typescript +import {E_CANCELED} from 'async-mutex'; + +const semaphore = new Semaphore(2, E_CANCELED, new Error('fancy custom error')); +``` + ## Limiting the time waiting for a mutex or semaphore to become available Sometimes it is desirable to limit the time a program waits for a mutex or diff --git a/src/Mutex.ts b/src/Mutex.ts index 20cffd6..cafd961 100644 --- a/src/Mutex.ts +++ b/src/Mutex.ts @@ -2,8 +2,8 @@ import MutexInterface from './MutexInterface'; import Semaphore from './Semaphore'; class Mutex implements MutexInterface { - constructor(cancelError?: Error) { - this._semaphore = new Semaphore(1, cancelError); + constructor(cancelError?: Error, unlockCancelError?: Error) { + this._semaphore = new Semaphore(1, cancelError, unlockCancelError); } async acquire(priority = 0): Promise { @@ -32,6 +32,10 @@ class Mutex implements MutexInterface { return this._semaphore.cancel(); } + cancelUnlockWaiters(): void { + return this._semaphore.cancelUnlockWaiters(); + } + private _semaphore: Semaphore; } diff --git a/src/MutexInterface.ts b/src/MutexInterface.ts index c09f22a..2fec0a1 100644 --- a/src/MutexInterface.ts +++ b/src/MutexInterface.ts @@ -10,6 +10,8 @@ interface MutexInterface { release(): void; cancel(): void; + + cancelUnlockWaiters(): void; } namespace MutexInterface { diff --git a/src/Semaphore.ts b/src/Semaphore.ts index b912f2d..8c4b47a 100644 --- a/src/Semaphore.ts +++ b/src/Semaphore.ts @@ -1,4 +1,4 @@ -import { E_CANCELED } from './errors'; +import { E_CANCELED, E_UNLOCKWAITERS_CANCELED } from './errors'; import SemaphoreInterface from './SemaphoreInterface'; @@ -15,11 +15,12 @@ interface QueueEntry { interface Waiter { resolve(): void; + reject(error: unknown): void; priority: number; } class Semaphore implements SemaphoreInterface { - constructor(private _value: number, private _cancelError: Error = E_CANCELED) {} + constructor(private _value: number, private _cancelError: Error = E_CANCELED, private _unlockCancelError: Error = E_UNLOCKWAITERS_CANCELED) { } acquire(weight = 1, priority = 0): Promise<[number, SemaphoreInterface.Releaser]> { if (weight <= 0) throw new Error(`invalid weight ${weight}: must be positive`); @@ -52,9 +53,9 @@ class Semaphore implements SemaphoreInterface { if (this._couldLockImmediately(weight, priority)) { return Promise.resolve(); } else { - return new Promise((resolve) => { + return new Promise((resolve, reject) => { if (!this._weightedWaiters[weight - 1]) this._weightedWaiters[weight - 1] = []; - insertSorted(this._weightedWaiters[weight - 1], { resolve, priority }); + insertSorted(this._weightedWaiters[weight - 1], { resolve, reject, priority }); }); } } @@ -84,6 +85,11 @@ class Semaphore implements SemaphoreInterface { this._queue = []; } + cancelUnlockWaiters(): void { + this._weightedWaiters.forEach(waiters => waiters.forEach(waiter => waiter.reject(this._unlockCancelError))); + this._weightedWaiters = []; + } + private _dispatchQueue(): void { this._drainUnlockWaiters(); while (this._queue.length > 0 && this._queue[0].weight <= this._value) { diff --git a/src/SemaphoreInterface.ts b/src/SemaphoreInterface.ts index 14e40f6..fa6e320 100644 --- a/src/SemaphoreInterface.ts +++ b/src/SemaphoreInterface.ts @@ -14,6 +14,8 @@ interface SemaphoreInterface { release(weight?: number): void; cancel(): void; + + cancelUnlockWaiters(): void; } namespace SemaphoreInterface { diff --git a/src/errors.ts b/src/errors.ts index 05d84ea..1f9caf5 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -1,3 +1,4 @@ export const E_TIMEOUT = new Error('timeout while waiting for mutex to become available'); export const E_ALREADY_LOCKED = new Error('mutex already locked'); export const E_CANCELED = new Error('request for lock canceled'); +export const E_UNLOCKWAITERS_CANCELED = new Error('request for unlock canceled'); diff --git a/src/withTimeout.ts b/src/withTimeout.ts index 77e232f..4ff6645 100644 --- a/src/withTimeout.ts +++ b/src/withTimeout.ts @@ -78,6 +78,10 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout: return sync.cancel(); }, + cancelUnlockWaiters(): void { + return sync.cancelUnlockWaiters(); + }, + waitForUnlock: (weightOrPriority?: number, priority?: number): Promise => { let weight: number | undefined; if (isSemaphore(sync)) { @@ -90,15 +94,20 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout: throw new Error(`invalid weight ${weight}: must be positive`); } - return new Promise((resolve, reject) => { + return new Promise(async (resolve, reject) => { const handle = setTimeout(() => reject(timeoutError), timeout); - (isSemaphore(sync) - ? sync.waitForUnlock(weight, priority) - : sync.waitForUnlock(priority) - ).then(() => { - clearTimeout(handle); + + try { + await (isSemaphore(sync) + ? sync.waitForUnlock(weight, priority) + : sync.waitForUnlock(priority) + ); resolve(); - }); + } catch (e) { + reject(e); + } finally { + clearTimeout(handle); + } }); }, diff --git a/test/mutex.ts b/test/mutex.ts index d945209..834b91f 100644 --- a/test/mutex.ts +++ b/test/mutex.ts @@ -2,12 +2,12 @@ import * as assert from 'assert'; import { InstalledClock, install } from '@sinonjs/fake-timers'; -import { E_CANCELED } from '../src/errors'; +import { E_CANCELED, E_UNLOCKWAITERS_CANCELED } from '../src/errors'; import Mutex from '../src/Mutex'; import MutexInterface from '../src/MutexInterface'; import { withTimer } from './util'; -export const mutexSuite = (factory: (cancelError?: Error) => MutexInterface): void => { +export const mutexSuite = (factory: (cancelError?: Error, unlockCancelError?: Error) => MutexInterface): void => { let mutex: MutexInterface; let clock: InstalledClock; @@ -191,7 +191,7 @@ export const mutexSuite = (factory: (cancelError?: Error) => MutexInterface): vo assert.strictEqual(v, 3); }); - test('cancel rejects all pending locks witth E_CANCELED', async () => { + test('cancel rejects all pending locks with E_CANCELED', async () => { await mutex.acquire(); const ticket = mutex.acquire(); @@ -321,6 +321,31 @@ export const mutexSuite = (factory: (cancelError?: Error) => MutexInterface): vo await clock.tickAsync(0); assert.strictEqual(flag, false); }); + + test('cancelUnlockWaiters rejects all pending unlockWaiters with E_UNLOCKWAITERS_CANCELED', async () => { + await mutex.acquire(); + + const res1 = mutex.waitForUnlock(); + const res2 = mutex.waitForUnlock(); + + mutex.cancelUnlockWaiters(); + + await assert.rejects(res1, E_UNLOCKWAITERS_CANCELED); + await assert.rejects(res2, E_UNLOCKWAITERS_CANCELED); + }); + + test('cancelUnlockWaiters rejects with a custom error if provided', async () => { + const err = new Error(); + const mutex = factory(E_CANCELED, err); + + await mutex.acquire(); + + const res1 = mutex.waitForUnlock(); + + mutex.cancelUnlockWaiters(); + + await assert.rejects(res1, err); + }); }; -suite('Mutex', () => mutexSuite((e) => new Mutex(e))); +suite('Mutex', () => mutexSuite((cancelError, unlockCancelError) => new Mutex(cancelError, unlockCancelError))); diff --git a/test/semaphore.ts b/test/semaphore.ts index 402a950..b3b4003 100644 --- a/test/semaphore.ts +++ b/test/semaphore.ts @@ -2,5 +2,5 @@ import Semaphore from '../src/Semaphore'; import { semaphoreSuite } from './semaphoreSuite'; suite('Semaphore', () => { - semaphoreSuite((maxConcurrency: number, err?: Error) => new Semaphore(maxConcurrency, err)); + semaphoreSuite((maxConcurrency: number, cancelError?: Error, unlockCancelError?: Error) => new Semaphore(maxConcurrency, cancelError, unlockCancelError)); }); diff --git a/test/semaphoreSuite.ts b/test/semaphoreSuite.ts index c75a179..e4ce909 100644 --- a/test/semaphoreSuite.ts +++ b/test/semaphoreSuite.ts @@ -2,11 +2,11 @@ import * as assert from 'assert'; import { InstalledClock, install } from '@sinonjs/fake-timers'; -import { E_CANCELED } from '../src/errors'; +import { E_CANCELED, E_UNLOCKWAITERS_CANCELED } from '../src/errors'; import SemaphoreInterface from '../src/SemaphoreInterface'; import { withTimer } from './util'; -export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) => SemaphoreInterface): void => { +export const semaphoreSuite = (factory: (maxConcurrency: number, cancelError?: Error, unlockCancelError?: Error) => SemaphoreInterface): void => { let semaphore: SemaphoreInterface; let clock: InstalledClock; @@ -119,7 +119,7 @@ export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) => let done = false; async function lightLoop() { while (!done) { - const [,release] = await semaphore.acquire(1); + const [, release] = await semaphore.acquire(1); await new Promise((resolve) => { setTimeout(resolve, 10); }); release(); } @@ -596,4 +596,41 @@ export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) => test('trying to waitForUnlock with a negative weight throws', () => { assert.throws(() => semaphore.waitForUnlock(-1)); }); + + test('cancelUnlockWaiters rejects all pending unlockWaiters with E_UNLOCKWAITERS_CANCELED', async () => { + await semaphore.acquire(); + await semaphore.acquire(); + + const res1 = semaphore.waitForUnlock(); + const res2 = semaphore.waitForUnlock(); + + semaphore.cancelUnlockWaiters(); + + await assert.rejects(res1, E_UNLOCKWAITERS_CANCELED); + await assert.rejects(res2, E_UNLOCKWAITERS_CANCELED); + }); + + test('cancelUnlockWaiters rejects with a custom error if provided', async () => { + const err = new Error(); + const semaphore = factory(2, E_CANCELED, err); + + await semaphore.acquire(); + await semaphore.acquire(); + + const res1 = semaphore.waitForUnlock(); + + semaphore.cancelUnlockWaiters(); + + await assert.rejects(res1, err); + }); + + test('cancelUnlockWaiters works fine with isolated weights', async () => { + const res1 = semaphore.waitForUnlock(3); + const res2 = semaphore.waitForUnlock(5); + + semaphore.cancelUnlockWaiters(); + + await assert.rejects(res1, E_UNLOCKWAITERS_CANCELED); + await assert.rejects(res2, E_UNLOCKWAITERS_CANCELED); + }); }; diff --git a/test/withTimeout.ts b/test/withTimeout.ts index 1603655..0c283fb 100644 --- a/test/withTimeout.ts +++ b/test/withTimeout.ts @@ -130,7 +130,7 @@ suite('withTimeout', () => { }); }); - suite('Mutex API', () => mutexSuite((e) => withTimeout(new Mutex(e), 500))); + suite('Mutex API', () => mutexSuite((cancelError, unlockCancelError) => withTimeout(new Mutex(cancelError, unlockCancelError), 500))); }); suite('Semaphore', () => { @@ -222,7 +222,7 @@ suite('withTimeout', () => { assert.strictEqual(flag, false); }); - test('after a timeout, runExclusive automatically releases the semamphore once it is acquired', async () => { + test('after a timeout, runExclusive automatically releases the semaphore once it is acquired', async () => { semaphore.acquire().then(([, release]) => setTimeout(release, 150)); const result = semaphore.runExclusive(() => undefined); @@ -270,8 +270,8 @@ suite('withTimeout', () => { }); suite('Semaphore API', () => - semaphoreSuite((maxConcurrency: number, err?: Error) => - withTimeout(new Semaphore(maxConcurrency, err), 500) + semaphoreSuite((maxConcurrency: number, cancelError?: Error, unlockCancelError?: Error) => + withTimeout(new Semaphore(maxConcurrency, cancelError, unlockCancelError), 500) ) ); }); From 39e2a386341c092925afb1ef80b80f8b624256b9 Mon Sep 17 00:00:00 2001 From: alveifbklsiu259 Date: Sun, 16 Jun 2024 17:10:38 +0800 Subject: [PATCH 2/2] Add documentation to Semaphore --- src/Semaphore.ts | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/Semaphore.ts b/src/Semaphore.ts index 8c4b47a..68f4726 100644 --- a/src/Semaphore.ts +++ b/src/Semaphore.ts @@ -140,7 +140,17 @@ class Semaphore implements SemaphoreInterface { weight <= this._value; } + /** + * `_queue` is sorted in descending order by the `priority` value passed to each lock call. + */ + private _queue: Array = []; + + /** + * `_weightedWaiters` is sorted in ascending order by the `weight` argument of each `waitForUnlock(weight, priority)` call. + * + * Each element of `_weightedWaiters` contains a list of "unlock waiters", which is sorted in descending order by the `priority` argument of each `waitForUnlock` call. + */ private _weightedWaiters: Array> = []; } @@ -149,6 +159,10 @@ function insertSorted(a: T[], v: T) { a.splice(i + 1, 0, v); } +/** + * Finds the index from the end of the list based on `predicate`, the found index is used to inset an item to `_queue` or `_weightedWaiters`. + */ + function findIndexFromEnd(a: T[], predicate: (e: T) => boolean): number { for (let i = a.length - 1; i >= 0; i--) { if (predicate(a[i])) {