11import { EventEmitter } from 'stream'
2+ import { assert } from 'console'
23
34/**
45 * 非同期反復可能な先入れ先出し型の待ち行列への非同期反復子
@@ -39,7 +40,11 @@ class AIQAsyncIterator<T> implements AsyncIterator<T> {
3940/**
4041 * 非同期反復可能な先入れ先出し型の待ち行列の状態を表す型
4142 */
42- type AIQState = 'ending' | 'finished'
43+ enum AIQState {
44+ ending = 1 ,
45+ finished = 2 ,
46+ undefined = 0 ,
47+ }
4348
4449/**
4550 * 非同期反復可能な先入れ先出し型の待ち行列
@@ -63,18 +68,14 @@ export class AsyncIterableQueue<T> implements AsyncIterable<T> {
6368 /**
6469 * この待ち行列の現在の状態
6570 */
66- #state?: AIQState
71+ readonly #state = new Uint8Array ( [ AIQState . undefined ] )
6772
6873 /**
6974 * コンストラクタ
7075 */
7176 constructor ( ) {
7277 const resolveAsync = createAsyncResolver ( {
73- finish : ( ) => {
74- const state = this . #state
75- this . #state = 'finished'
76- return state
77- } ,
78+ finish : ( ) => Atomics . exchange ( this . #state, 0 , AIQState . finished ) ,
7879 resolvers : this . #resolvers,
7980 } )
8081 this . #emitter. on ( 'deq' , async ( ) => {
@@ -95,10 +96,14 @@ export class AsyncIterableQueue<T> implements AsyncIterable<T> {
9596 end ( cb ?: NoParameterCallback ) : Promise < void > {
9697 return new Promise (
9798 ( resolve : Resolver < void > , reject : SingleParameterAction < unknown > ) => {
98- const state = this . #state
99- if ( state )
100- return reject ( new Error ( state ) )
101- this . #state = 'ending'
99+ const state = Atomics . compareExchange (
100+ this . #state,
101+ 0 ,
102+ AIQState . undefined ,
103+ AIQState . ending ,
104+ )
105+ if ( state !== AIQState . undefined )
106+ return reject ( new Error ( AIQState [ state ] ) )
102107 this . #emitter. emit ( 'enq' , new Terminator ( cb ) )
103108 return resolve ( )
104109 }
@@ -112,9 +117,9 @@ export class AsyncIterableQueue<T> implements AsyncIterable<T> {
112117 push ( value : T ) : Promise < void > {
113118 return new Promise (
114119 ( resolve : Resolver < void > , reject : SingleParameterAction < unknown > ) => {
115- const state = this . #state
116- if ( state )
117- return reject ( new Error ( state ) )
120+ const state = Atomics . load ( this . #state, 0 )
121+ if ( state !== AIQState . undefined )
122+ return reject ( new Error ( AIQState [ state ] ) )
118123 this . #emitter. emit ( 'enq' , value )
119124 return resolve ( )
120125 }
@@ -220,8 +225,8 @@ const createAsyncResolver = <T>(param: AsyncResolverCreateParameter<T>) => {
220225 if ( value instanceof Terminator ) {
221226 const state = param . finish ( )
222227 await resolveAsync ( { done : true } as IteratorResult < T > )
223- if ( state === ' ending' )
224- await value . call ( )
228+ assert ( state === AIQState . ending )
229+ await value . call ( )
225230 }
226231 else
227232 await resolveAsync ( {
0 commit comments