@@ -136,78 +136,86 @@ class JsonArrayStreamer<T> {
136
136
}
137
137
138
138
public async * stream ( chunkSize : number , filter ?: ( element : T ) => boolean ) {
139
- characterStream: for await ( const chunk of this . chunkGenerator ( ) ) {
140
- for ( let char of chunk ) {
141
- if ( ! this . rootDetected ) {
142
- if (
143
- ! [
144
- CHARACTER . SPACE ,
145
- CHARACTER . NEW_LINE ,
146
- CHARACTER . BRACKET . OPEN ,
147
- ] . includes ( char )
148
- )
149
- throw new Error ( ERRORS . INVALID_FILE ) ;
150
-
151
- this . rootDetected = char === CHARACTER . BRACKET . OPEN ;
152
- continue ;
153
- }
139
+ try {
140
+ characterStream: for await ( const chunk of this . chunkGenerator ( ) ) {
141
+ for ( let char of chunk ) {
142
+ if ( ! this . rootDetected ) {
143
+ if (
144
+ ! [
145
+ CHARACTER . SPACE ,
146
+ CHARACTER . NEW_LINE ,
147
+ CHARACTER . BRACKET . OPEN ,
148
+ ] . includes ( char )
149
+ )
150
+ throw new Error ( ERRORS . INVALID_FILE ) ;
151
+
152
+ this . rootDetected = char === CHARACTER . BRACKET . OPEN ;
153
+ continue ;
154
+ }
154
155
155
- if ( ! this . elementDetected ) {
156
- if ( char === CHARACTER . BRACKET . CLOSE ) break characterStream;
156
+ if ( ! this . elementDetected ) {
157
+ if ( char === CHARACTER . BRACKET . CLOSE ) break characterStream;
157
158
158
- this . elementDetected = ! [
159
- CHARACTER . SPACE ,
160
- CHARACTER . COMMA ,
161
- CHARACTER . NEW_LINE ,
162
- ] . includes ( char ) ;
163
- }
159
+ this . elementDetected = ! [
160
+ CHARACTER . SPACE ,
161
+ CHARACTER . COMMA ,
162
+ CHARACTER . NEW_LINE ,
163
+ ] . includes ( char ) ;
164
+ }
164
165
165
- if ( this . elementDetected ) {
166
- if ( ! this . elementParser ) {
167
- if ( char === CHARACTER . BRACKET . OPEN ) {
168
- this . elementType = "array" ;
169
- this . elementParser = this . containerElementParser ;
170
- } else if ( char === CHARACTER . BRACE . OPEN ) {
171
- this . elementType = "object" ;
172
- this . elementParser = this . containerElementParser ;
173
- } else if ( char === CHARACTER . QUOTE ) {
174
- this . elementType = "string" ;
175
- this . elementParser = this . stringElementParser ;
176
- } else {
177
- this . elementType = "others" ;
178
- this . elementParser = this . primitiveElementParser ;
166
+ if ( this . elementDetected ) {
167
+ if ( ! this . elementParser ) {
168
+ if ( char === CHARACTER . BRACKET . OPEN ) {
169
+ this . elementType = "array" ;
170
+ this . elementParser = this . containerElementParser ;
171
+ } else if ( char === CHARACTER . BRACE . OPEN ) {
172
+ this . elementType = "object" ;
173
+ this . elementParser = this . containerElementParser ;
174
+ } else if ( char === CHARACTER . QUOTE ) {
175
+ this . elementType = "string" ;
176
+ this . elementParser = this . stringElementParser ;
177
+ } else {
178
+ this . elementType = "others" ;
179
+ this . elementParser = this . primitiveElementParser ;
180
+ }
181
+ } else if (
182
+ this . elementParser === this . primitiveElementParser &&
183
+ char === CHARACTER . BRACKET . CLOSE
184
+ ) {
185
+ break characterStream;
179
186
}
180
- } else if (
181
- this . elementParser === this . primitiveElementParser &&
182
- char === CHARACTER . BRACKET . CLOSE
183
- ) {
184
- break characterStream;
185
- }
186
187
187
- this . elementParser ( char , filter ) ;
188
+ this . elementParser ( char , filter ) ;
188
189
189
- if ( this . resultBuffer . length === chunkSize ) {
190
- if ( ! this . readStream ?. closed ) this . readStream ?. pause ( ) ;
191
- yield this . resultBuffer . splice ( 0 , chunkSize ) ;
192
- if ( ! this . readStream ?. closed ) this . readStream ?. resume ( ) ;
190
+ if ( this . resultBuffer . length === chunkSize ) {
191
+ if ( ! this . readStream ?. closed ) this . readStream ?. pause ( ) ;
192
+ yield this . resultBuffer . splice ( 0 , chunkSize ) ;
193
+ if ( ! this . readStream ?. closed ) this . readStream ?. resume ( ) ;
194
+ }
193
195
}
194
196
}
195
197
}
196
- }
197
198
198
- this . readStream ?. close ( ) ;
199
- this . readStream = null ;
199
+ this . readStream ?. close ( ) ;
200
+ this . readStream = null ;
200
201
201
- if ( this . chunkBuffer . length ) {
202
- const element = < T > this . getParsedElement ( ) ;
203
- this . addToResult ( element , filter ) ;
202
+ if ( this . chunkBuffer . length ) {
203
+ const element = < T > this . getParsedElement ( ) ;
204
+ this . addToResult ( element , filter ) ;
205
+ this . resetParser ( ) ;
206
+ }
207
+ if ( this . resultBuffer . length ) {
208
+ yield this . resultBuffer . splice ( 0 ) ;
209
+ }
210
+
211
+ return this . resultBuffer ;
212
+ } catch ( error ) {
213
+ this . readStream ?. close ( ) ;
204
214
this . resetParser ( ) ;
215
+ this . resultBuffer = [ ] ;
216
+ this . readStream = null ;
217
+ throw error ;
205
218
}
206
- if ( this . resultBuffer . length ) {
207
- yield this . resultBuffer . splice ( 0 ) ;
208
- }
209
-
210
- return this . resultBuffer ;
211
219
}
212
220
213
221
private static sanitizeReadStreamOptions = (
0 commit comments