forked from hplush/slowreader
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathqueue.ts
More file actions
84 lines (78 loc) · 1.88 KB
/
queue.ts
File metadata and controls
84 lines (78 loc) · 1.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
type TaskTypes = object
type Task<Types extends TaskTypes> = {
[Key in keyof Types]: {
payload: Types[Key]
type: Key
}
}[keyof Types]
type QueueProcessor<Types extends TaskTypes> = {
[Key in keyof Types]: (
payload: Types[Key],
tasks: Task<Types>[]
) => Promise<void>
}
export interface Queue<Types extends TaskTypes> {
start(workers: number, processors: QueueProcessor<Types>): Promise<void>
stop(): void
tasks: Task<Types>[]
}
/**
* Process big queue of tasks by limited number of workers with retrying
* on errors.
*
* We are using it in feeds refresh and moved to separated abstraction
* to simplify refreshing code.
*/
export function createQueue<Types extends TaskTypes>(
tasks: Task<Types>[] = []
): Queue<Types> {
let stopped = false
return {
async start(workers, processors) {
async function worker(): Promise<void> {
if (stopped) return
let task = tasks.shift()
if (!task) return
await processors[task.type](task.payload, tasks)
await worker()
}
let promises: Promise<void>[] = []
for (let i = 0; i < workers; i++) {
promises.push(worker())
}
await Promise.all(promises)
},
stop() {
stopped = true
},
tasks
}
}
/* node:coverage disable */
export async function retryOnError<Result>(
cb: () => Promise<Result>,
onFirstError: (e: Error) => void,
attempts = 3
): Promise<'abort' | 'error' | Result> {
let result: Result | undefined
try {
result = await cb()
return result
} catch (e) {
if (e instanceof Error) {
if (e.name === 'AbortError') {
return 'abort'
} else {
attempts -= 1
if (attempts === 0) {
return 'error'
} else {
onFirstError(e)
return retryOnError(cb, () => {}, attempts)
}
}
}
throw e
}
}
/* node:coverage enable */