Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,79 @@ tryAcquire(semaphoreOrMutex, new Error('new fancy error'))
// ...
});
```

# Migration Guide
When migrating away from version [] to version [], make the following changes:
- The `acquire`, `runExclusive`, and `waitForUnlock` methods no longer accept weight or priority parameters directly. They are wrapped in an optional options object instead. For example, replace:

Examples with `Semaphore`:
```typescript
semaphore.acquire() // No change needed

semaphore.acquire(2, 5) // Old
semaphore.acquire({ weight: 2, priority: 5 }) // New

semaphore.acquire(undefined, 5) // Old
semaphore.acquire({ priority: 5 }) // New
semaphore.acquire({ weight: 1, priority: 5 }) // Equivalent

semaphore.acquire(2) // Old
semaphore.acquire({ weight: 5 }) // New
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, that's not the same weight!



// runExclusive: same change to weight and priority as with acquire()

// No change:
semaphore.runExclusive((value: number) => {
doSomeWork()
});

// Old
semaphore.runExclusive((value: number) => {
doSomeWork()
}, 2, -1)

// New
semaphore.runExclusive((value: number) => {
doSomeWork()
}, { weight: 2, priority: -1 })


// waitForUnlock: same

semaphore.waitForUnlock(2, 3) // Old
semaphore.waitForUnlock({ weight: 2, priority: 3 }) // New
```

Examples with `Mutex`:
```typescript
mutex.acquire() // No change needed

mutex.acquire(-1) // Old
mutex.acquire({ priority: -1 }) // New

// No change:
mutex.runExclusive(() => {
doSomeWork()
})

// Old:
mutex.runExclusive(() => {
doSomeWork()
}, 1)

// New:
mutex.runExclusive(() => {
doSomeWork()
}, { priority: 1 })


mutex.waitForUnlock() // No change

mutex.waitForUnlock(5) // Old
mutex.waitForUnlock({ priority: 5 }) // New
```

# License

Feel free to use this library under the conditions of the MIT license.
14 changes: 7 additions & 7 deletions src/Mutex.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
import MutexInterface from './MutexInterface';
import { MutexOptions, MutexInterface } from './MutexInterface';
import Semaphore from './Semaphore';

class Mutex implements MutexInterface {
constructor(cancelError?: Error) {
this._semaphore = new Semaphore(1, cancelError);
}

async acquire(priority = 0): Promise<MutexInterface.Releaser> {
const [, releaser] = await this._semaphore.acquire(1, priority);
async acquire(options?: MutexOptions): Promise<MutexInterface.Releaser> {
const [, releaser] = await this._semaphore.acquire(options);

return releaser;
}

runExclusive<T>(callback: MutexInterface.Worker<T>, priority = 0): Promise<T> {
return this._semaphore.runExclusive(() => callback(), 1, priority);
runExclusive<T>(callback: MutexInterface.Worker<T>, options?: MutexOptions): Promise<T> {
return this._semaphore.runExclusive(() => callback(), options);
}

isLocked(): boolean {
return this._semaphore.isLocked();
}

waitForUnlock(priority = 0): Promise<void> {
return this._semaphore.waitForUnlock(1, priority);
waitForUnlock(options?: MutexOptions): Promise<void> {
return this._semaphore.waitForUnlock(options);
}

release(): void {
Expand Down
14 changes: 9 additions & 5 deletions src/MutexInterface.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
interface MutexInterface {
acquire(priority?: number): Promise<MutexInterface.Releaser>;
export interface MutexInterface {
acquire(options?: MutexOptions): Promise<MutexInterface.Releaser>;

runExclusive<T>(callback: MutexInterface.Worker<T>, priority?: number): Promise<T>;
runExclusive<T>(callback: MutexInterface.Worker<T>, options?: MutexOptions): Promise<T>;

waitForUnlock(priority?: number): Promise<void>;
waitForUnlock(options?: MutexOptions): Promise<void>;

isLocked(): boolean;

Expand All @@ -12,7 +12,11 @@ interface MutexInterface {
cancel(): void;
}

namespace MutexInterface {
export interface MutexOptions {
priority?: number;
}

export namespace MutexInterface {
export interface Releaser {
(): void;
}
Expand Down
15 changes: 10 additions & 5 deletions src/Semaphore.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { E_CANCELED } from './errors';
import SemaphoreInterface from './SemaphoreInterface';
import { SemaphoreOptions, SemaphoreInterface } from './SemaphoreInterface';


interface Priority {
Expand All @@ -21,7 +21,10 @@ interface Waiter {
class Semaphore implements SemaphoreInterface {
constructor(private _value: number, private _cancelError: Error = E_CANCELED) {}

acquire(weight = 1, priority = 0): Promise<[number, SemaphoreInterface.Releaser]> {
acquire(options?: SemaphoreOptions): Promise<[number, SemaphoreInterface.Releaser]> {
options = options || { weight: 1, priority: 0 };
const weight = options.weight !== undefined ? options.weight : 1;
const priority = options.priority !== undefined ? options.priority : 0;
if (weight <= 0) throw new Error(`invalid weight ${weight}: must be positive`);

return new Promise((resolve, reject) => {
Expand All @@ -36,8 +39,8 @@ class Semaphore implements SemaphoreInterface {
});
}

async runExclusive<T>(callback: SemaphoreInterface.Worker<T>, weight = 1, priority = 0): Promise<T> {
const [value, release] = await this.acquire(weight, priority);
async runExclusive<T>(callback: SemaphoreInterface.Worker<T>, options?: SemaphoreOptions): Promise<T> {
const [value, release] = await this.acquire(options);

try {
return await callback(value);
Expand All @@ -46,7 +49,9 @@ class Semaphore implements SemaphoreInterface {
}
}

waitForUnlock(weight = 1, priority = 0): Promise<void> {
waitForUnlock(options?: SemaphoreOptions): Promise<void> {
const weight = options?.weight !== undefined ? options.weight : 1;
const priority = options?.priority || 0;
if (weight <= 0) throw new Error(`invalid weight ${weight}: must be positive`);

if (this._couldLockImmediately(weight, priority)) {
Expand Down
15 changes: 10 additions & 5 deletions src/SemaphoreInterface.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
interface SemaphoreInterface {
acquire(weight?: number, priority?: number): Promise<[number, SemaphoreInterface.Releaser]>;
export interface SemaphoreInterface {
acquire(options?: SemaphoreOptions): Promise<[number, SemaphoreInterface.Releaser]>;

runExclusive<T>(callback: SemaphoreInterface.Worker<T>, weight?: number, priority?: number): Promise<T>;
runExclusive<T>(callback: SemaphoreInterface.Worker<T>, options?: SemaphoreOptions): Promise<T>;

waitForUnlock(weight?: number, priority?: number): Promise<void>;
waitForUnlock(options?: SemaphoreOptions): Promise<void>;

isLocked(): boolean;

Expand All @@ -16,7 +16,12 @@ interface SemaphoreInterface {
cancel(): void;
}

namespace SemaphoreInterface {
export interface SemaphoreOptions {
weight?: number;
priority?: number;
}

export namespace SemaphoreInterface {
export interface Releaser {
(): void;
}
Expand Down
50 changes: 14 additions & 36 deletions src/withTimeout.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,16 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { E_TIMEOUT } from './errors';
import MutexInterface from './MutexInterface';
import SemaphoreInterface from './SemaphoreInterface';
import { MutexInterface, MutexOptions } from './MutexInterface';
import { SemaphoreInterface, SemaphoreOptions } from './SemaphoreInterface';

export function withTimeout(mutex: MutexInterface, timeout: number, timeoutError?: Error): MutexInterface;
export function withTimeout(semaphore: SemaphoreInterface, timeout: number, timeoutError?: Error): SemaphoreInterface;
export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout: number, timeoutError = E_TIMEOUT): any {
return {
acquire: (weightOrPriority?: number, priority?: number): Promise<MutexInterface.Releaser | [number, SemaphoreInterface.Releaser]> => {
let weight: number | undefined;
if (isSemaphore(sync)) {
weight = weightOrPriority;
} else {
weight = undefined;
priority = weightOrPriority;
}
if (weight !== undefined && weight <= 0) {
throw new Error(`invalid weight ${weight}: must be positive`);
acquire: (options?: SemaphoreOptions | MutexOptions): Promise<MutexInterface.Releaser | [number, SemaphoreInterface.Releaser]> => {
options = options || { weight: 1, priority: 0 };
if ('weight' in options && options.weight !== undefined && options.weight <= 0) {
throw new Error(`invalid weight ${options.weight}: must be positive`);
}

return new Promise(async (resolve, reject) => {
Expand All @@ -28,10 +22,7 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout:
}, timeout);

try {
const ticket = await (isSemaphore(sync)
? sync.acquire(weight, priority)
: sync.acquire(priority)
);
const ticket = await sync.acquire(options);
if (isTimeout) {
const release = Array.isArray(ticket) ? ticket[1] : ticket;

Expand All @@ -50,11 +41,11 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout:
});
},

async runExclusive<T>(callback: (value?: number) => Promise<T> | T, weight?: number, priority?: number): Promise<T> {
async runExclusive<T>(callback: (value?: number) => Promise<T> | T, options?: MutexOptions | SemaphoreOptions): Promise<T> {
let release: () => void = () => undefined;

try {
const ticket = await this.acquire(weight, priority);
const ticket = await this.acquire(options);

if (Array.isArray(ticket)) {
release = ticket[1];
Expand All @@ -78,24 +69,15 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout:
return sync.cancel();
},

waitForUnlock: (weightOrPriority?: number, priority?: number): Promise<void> => {
let weight: number | undefined;
if (isSemaphore(sync)) {
weight = weightOrPriority;
} else {
weight = undefined;
priority = weightOrPriority;
}
if (weight !== undefined && weight <= 0) {
throw new Error(`invalid weight ${weight}: must be positive`);
waitForUnlock: (options?: MutexOptions | SemaphoreOptions): Promise<void> => {
options = options || { weight: 1, priority: 0 };
if ('weight' in options && options.weight !== undefined && options.weight <= 0) {
throw new Error(`invalid weight ${options.weight}: must be positive`);
}

return new Promise((resolve, reject) => {
const handle = setTimeout(() => reject(timeoutError), timeout);
(isSemaphore(sync)
? sync.waitForUnlock(weight, priority)
: sync.waitForUnlock(priority)
).then(() => {
sync.waitForUnlock(options).then(() => {
clearTimeout(handle);
resolve();
});
Expand All @@ -109,7 +91,3 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout:
setValue: (value: number) => (sync as SemaphoreInterface).setValue(value),
};
}

function isSemaphore(sync: SemaphoreInterface | MutexInterface): sync is SemaphoreInterface {
return (sync as SemaphoreInterface).getValue !== undefined;
}
24 changes: 12 additions & 12 deletions test/mutex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,19 @@ export const mutexSuite = (factory: (cancelError?: Error) => MutexInterface): vo
const values: number[] = [];

// Scheduled immediately
mutex.acquire(0).then((release) => {
mutex.acquire({ priority: 0 }).then((release) => {
values.push(0);
setTimeout(release, 100)
});

// Low priority task
mutex.acquire(-1).then((release) => {
mutex.acquire({ priority: -1 }).then((release) => {
values.push(-1);
setTimeout(release, 100)
});

// High priority task; jumps the queue
mutex.acquire(1).then((release) => {
mutex.acquire({ priority: 1 }).then((release) => {
values.push(1);
setTimeout(release, 100)
});
Expand Down Expand Up @@ -110,9 +110,9 @@ export const mutexSuite = (factory: (cancelError?: Error) => MutexInterface): vo

test('runExclusive unblocks the highest-priority task first', async () => {
const values: number[] = [];
mutex.runExclusive(() => { values.push(0); }, 0);
mutex.runExclusive(() => { values.push(-1); }, -1);
mutex.runExclusive(() => { values.push(+1); }, +1);
mutex.runExclusive(() => { values.push(0); }, { priority: 0 });
mutex.runExclusive(() => { values.push(-1); }, { priority: -1 });
mutex.runExclusive(() => { values.push(+1); }, { priority: +1 });
await clock.runAllAsync();
assert.deepStrictEqual(values, [0, +1, -1]);
});
Expand Down Expand Up @@ -303,20 +303,20 @@ export const mutexSuite = (factory: (cancelError?: Error) => MutexInterface): vo
});

test('waitForUnlock unblocks high-priority waiters before low-priority queued tasks', async () => {
mutex.acquire(0); // Immediately scheduled
mutex.acquire(0); // Waiting
mutex.acquire({ priority: 0 }); // Immediately scheduled
mutex.acquire({ priority: 0 }); // Waiting
let flag = false;
mutex.waitForUnlock(1).then(() => { flag = true; });
mutex.waitForUnlock({ priority: 1 }).then(() => { flag = true; });
mutex.release();
await clock.tickAsync(0);
assert.strictEqual(flag, true);
});

test('waitForUnlock unblocks low-priority waiters after high-priority queued tasks', async () => {
mutex.acquire(0); // Immediately scheduled
mutex.acquire(0); // Waiting
mutex.acquire({ priority: 0 }); // Immediately scheduled
mutex.acquire({ priority: 0 }); // Waiting
let flag = false;
mutex.waitForUnlock(-1).then(() => { flag = true; });
mutex.waitForUnlock({ priority: -1 }).then(() => { flag = true; });
mutex.release();
await clock.tickAsync(0);
assert.strictEqual(flag, false);
Expand Down
Loading