@@ -20,7 +20,10 @@ class AIQAsyncIterator<T> implements AsyncIterator<T> {
2020 * @param emitter 事象発生器への参照
2121 * @param resolvers 反復結果解決関数の配列への参照
2222 */
23- constructor ( emitter : EventEmitter , resolvers : IteratorResultResolver < T > [ ] ) {
23+ constructor (
24+ emitter : EventEmitter ,
25+ resolvers : IteratorResultResolver < T > [ ] ,
26+ ) {
2427 this . #emitter = emitter
2528 this . #resolvers = resolvers
2629 }
@@ -30,10 +33,14 @@ class AIQAsyncIterator<T> implements AsyncIterator<T> {
3033 * @returns 次の要素
3134 */
3235 next ( ) : Promise < IteratorResult < T > > {
33- return new Promise ( ( resolve : IteratorResultResolver < T > ) => (
34- this . #resolvers. push ( resolve ) ,
35- this . #emitter. emit ( 'deq' )
36- ) )
36+ return new Promise (
37+ (
38+ resolve : IteratorResultResolver < T > ,
39+ ) => {
40+ this . #resolvers. push ( resolve )
41+ this . #emitter. emit ( 'deq' )
42+ }
43+ )
3744 }
3845}
3946
@@ -75,37 +82,62 @@ export class AsyncIterableQueue<T> implements AsyncIterable<T> {
7582 */
7683 constructor ( ) {
7784 this . #state[ 0 ] = AIQState . undefined
78- const resolveAsync = createAsyncResolver ( {
79- finish : ( ) => Atomics . exchange ( this . #state, 0 , AIQState . finished ) ,
80- resolvers : this . #resolvers,
81- } )
82- this . #emitter. on ( 'deq' , async ( ) => {
83- while ( this . #queue. length && this . #resolvers. length )
84- await resolveAsync ( this . #queue. shift ( ) )
85- } )
86- this . #emitter. on ( 'enq' , async ( value : Terminatable < T > ) =>
87- this . #resolvers. length ?
88- await resolveAsync ( value ) :
89- this . #queue. push ( value )
85+ const resolveAsync = createAsyncResolver (
86+ {
87+ finish : ( ) =>
88+ Atomics . exchange (
89+ this . #state,
90+ 0 ,
91+ AIQState . finished ,
92+ ) ,
93+ resolvers : this . #resolvers,
94+ }
95+ )
96+ this . #emitter. on (
97+ 'deq' ,
98+ async ( ) => {
99+ while ( this . #queue. length
100+ && this . #resolvers. length )
101+ await resolveAsync (
102+ this . #queue. shift ( )
103+ )
104+ }
105+ )
106+ this . #emitter. on (
107+ 'enq' ,
108+ async ( value : Terminatable < T > ) =>
109+ this . #resolvers. length
110+ ? await resolveAsync ( value )
111+ : this . #queue. push ( value )
90112 )
91113 }
92114
93115 /**
94116 * この待ち行列への要素の追加を終了する
95117 * @param cb 終端が読み取られた後に呼ばれるコールバック関数
96118 */
97- end ( cb ?: NoParameterCallback ) : Promise < void > {
119+ end (
120+ cb ?: NoParameterCallback ,
121+ ) : Promise < void > {
98122 return new Promise (
99- ( resolve : Resolver < void > , reject : SingleParameterAction < unknown > ) => {
123+ (
124+ resolve : Resolver < void > ,
125+ reject : SingleParameterAction < unknown > ,
126+ ) => {
100127 const state = Atomics . compareExchange (
101128 this . #state,
102129 0 ,
103130 AIQState . undefined ,
104131 AIQState . ending ,
105132 )
106133 if ( state !== AIQState . undefined )
107- return reject ( new Error ( AIQState [ state ] ) )
108- this . #emitter. emit ( 'enq' , new Terminator ( cb ) )
134+ return reject (
135+ new Error ( AIQState [ state ] )
136+ )
137+ this . #emitter. emit (
138+ 'enq' ,
139+ new Terminator ( cb ) ,
140+ )
109141 return resolve ( )
110142 }
111143 )
@@ -117,11 +149,22 @@ export class AsyncIterableQueue<T> implements AsyncIterable<T> {
117149 */
118150 push ( value : T ) : Promise < void > {
119151 return new Promise (
120- ( resolve : Resolver < void > , reject : SingleParameterAction < unknown > ) => {
121- const state = Atomics . load ( this . #state, 0 )
152+ (
153+ resolve : Resolver < void > ,
154+ reject : SingleParameterAction < unknown > ,
155+ ) => {
156+ const state = Atomics . load (
157+ this . #state,
158+ 0 ,
159+ )
122160 if ( state !== AIQState . undefined )
123- return reject ( new Error ( AIQState [ state ] ) )
124- this . #emitter. emit ( 'enq' , value )
161+ return reject (
162+ new Error ( AIQState [ state ] )
163+ )
164+ this . #emitter. emit (
165+ 'enq' ,
166+ value ,
167+ )
125168 return resolve ( )
126169 }
127170 )
@@ -132,7 +175,10 @@ export class AsyncIterableQueue<T> implements AsyncIterable<T> {
132175 * @returns 非同期反復子
133176 */
134177 [ Symbol . asyncIterator ] ( ) : AsyncIterator < T > {
135- return new AIQAsyncIterator ( this . #emitter, this . #resolvers)
178+ return new AIQAsyncIterator (
179+ this . #emitter,
180+ this . #resolvers,
181+ )
136182 }
137183}
138184
@@ -155,22 +201,26 @@ type AsyncResolverCreateParameter<T> = {
155201/**
156202 * 反復結果解決関数型
157203 */
158- type IteratorResultResolver < T > = Resolver < IteratorResult < T > >
204+ type IteratorResultResolver < T > =
205+ Resolver < IteratorResult < T > >
159206
160207/**
161208 * 引数無しコールバック関数型
162209 */
163- type NoParameterCallback = ( ) => PromiseLike < void > | void
210+ type NoParameterCallback =
211+ ( ) => PromiseLike < void > | void
164212
165213/**
166214 * 解決関数型
167215 */
168- type Resolver < T > = SingleParameterAction < T >
216+ type Resolver < T > =
217+ SingleParameterAction < T >
169218
170219/**
171220 * 引数1個の関数型
172221 */
173- type SingleParameterAction < T > = ( arg : T ) => void
222+ type SingleParameterAction < T > =
223+ ( arg : T ) => void
174224
175225/**
176226 * 終端
@@ -180,59 +230,80 @@ class Terminator {
180230 * コンストラクタ
181231 * @param cb コールバック関数
182232 */
183- constructor ( private readonly cb ?: NoParameterCallback ) {
233+ constructor (
234+ private readonly cb ?: NoParameterCallback ,
235+ ) {
184236 }
185237
186238 /**
187239 * コールバック関数を呼び出す
188240 */
189241 call ( ) : Promise < void > {
190- return new Promise ( (
191- resolve : Resolver < void > ,
192- reject : SingleParameterAction < unknown > ,
193- ) => {
194- if ( this . cb )
195- try {
196- const result = this . cb ( )
197- if ( result instanceof Promise )
198- return result . catch ( reject ) . then ( resolve )
199- }
200- catch ( err : unknown ) {
201- return reject ( err )
202- }
203- return resolve ( )
204- } )
242+ return new Promise (
243+ (
244+ resolve : Resolver < void > ,
245+ reject : SingleParameterAction < unknown > ,
246+ ) => {
247+ if ( this . cb )
248+ try {
249+ const result = this . cb ( )
250+ if ( result instanceof Promise )
251+ return result . catch ( reject ) . then ( resolve )
252+ }
253+ catch ( err : unknown ) {
254+ return reject ( err )
255+ }
256+ return resolve ( )
257+ }
258+ )
205259 }
206260}
207261
208262/**
209263 * 終端可能型
210264 */
211- type Terminatable < T > = Terminator | T
265+ type Terminatable < T > =
266+ Terminator | T
212267
213268/**
214269 * 反復結果解決関数を非同期的に処理する関数を作成する
215270 * @param param パラメータ
216271 * @returns 反復結果解決関数を非同期的に処理する関数を返す
217272 */
218- const createAsyncResolver = < T > ( param : AsyncResolverCreateParameter < T > ) => {
219- const resolveAsync = ( result : IteratorResult < T > ) =>
220- new Promise ( ( callback : Resolver < void > ) => {
221- const resolver = param . resolvers . shift ( )
222- resolver ( result )
223- callback ( )
224- } )
225- return async ( value : Terminatable < T > ) => {
273+ const createAsyncResolver = < T > (
274+ param : AsyncResolverCreateParameter < T > ,
275+ ) => {
276+ const resolveAsync = (
277+ result : IteratorResult < T > ,
278+ ) =>
279+ new Promise (
280+ (
281+ callback : Resolver < void > ,
282+ ) => {
283+ const resolver = param . resolvers . shift ( )
284+ resolver ( result )
285+ callback ( )
286+ }
287+ )
288+ return async (
289+ value : Terminatable < T > ,
290+ ) => {
226291 if ( value instanceof Terminator ) {
227292 const state = param . finish ( )
228- await resolveAsync ( { done : true } as IteratorResult < T > )
293+ await resolveAsync (
294+ {
295+ done : true
296+ } as IteratorResult < T >
297+ )
229298 assert ( state === AIQState . ending )
230299 await value . call ( )
231300 }
232301 else
233- await resolveAsync ( {
234- done : false ,
235- value,
236- } )
302+ await resolveAsync (
303+ {
304+ done : false ,
305+ value,
306+ }
307+ )
237308 }
238309}
0 commit comments