@@ -197,7 +197,8 @@ async function* streamResponseInner(
197197 raw_data,
198198 binary_data,
199199 signal,
200- tracker
200+ tracker,
201+ user_id : syncParams . user_id
201202 } ) ;
202203
203204 await new Promise ( ( resolve ) => setTimeout ( resolve , 10 ) ) ;
@@ -214,6 +215,7 @@ interface BucketDataRequest {
214215 binary_data : boolean | undefined ;
215216 tracker : RequestTracker ;
216217 signal : AbortSignal ;
218+ user_id ?: string ;
217219}
218220
219221async function * bucketDataInBatches ( request : BucketDataRequest ) {
@@ -262,8 +264,19 @@ async function* bucketDataBatch(request: BucketDataRequest): AsyncGenerator<Buck
262264 const checkpointOp = BigInt ( checkpoint ) ;
263265 let checkpointInvalidated = false ;
264266
265- const [ _ , release ] = await syncSemaphore . acquire ( ) ;
267+ if ( syncSemaphore . isLocked ( ) ) {
268+ logger . info ( 'Sync concurrency limit reached, waiting for lock' , { user_id : request . user_id } ) ;
269+ }
270+ const [ value , release ] = await syncSemaphore . acquire ( ) ;
266271 try {
272+ if ( value <= 3 ) {
273+ // This can be noisy, so we only log when we get close to the
274+ // concurrency limit.
275+ logger . info ( `Got sync lock. Slots available: ${ value - 1 } ` , {
276+ user_id : request . user_id ,
277+ sync_data_slots : value - 1
278+ } ) ;
279+ }
267280 // Optimization: Only fetch buckets for which the checksums have changed since the last checkpoint
268281 // For the first batch, this will be all buckets.
269282 const filteredBuckets = new Map ( bucketsToFetch . map ( ( bucket ) => [ bucket , dataBuckets . get ( bucket ) ! ] ) ) ;
@@ -331,6 +344,13 @@ async function* bucketDataBatch(request: BucketDataRequest): AsyncGenerator<Buck
331344 }
332345 }
333346 } finally {
347+ if ( value <= 3 ) {
348+ // This can be noisy, so we only log when we get close to the
349+ // concurrency limit.
350+ logger . info ( `Releasing sync lock` , {
351+ user_id : request . user_id
352+ } ) ;
353+ }
334354 release ( ) ;
335355 }
336356}
0 commit comments