@@ -196,7 +196,8 @@ async function* streamResponseInner(
196196 raw_data,
197197 binary_data,
198198 signal,
199- tracker
199+ tracker,
200+ user_id : syncParams . user_id
200201 } ) ;
201202
202203 await new Promise ( ( resolve ) => setTimeout ( resolve , 10 ) ) ;
@@ -213,6 +214,7 @@ interface BucketDataRequest {
213214 binary_data : boolean | undefined ;
214215 tracker : RequestTracker ;
215216 signal : AbortSignal ;
217+ user_id ?: string ;
216218}
217219
218220async function * bucketDataInBatches ( request : BucketDataRequest ) {
@@ -261,8 +263,19 @@ async function* bucketDataBatch(request: BucketDataRequest): AsyncGenerator<Buck
261263 const checkpointOp = BigInt ( checkpoint ) ;
262264 let checkpointInvalidated = false ;
263265
264- const [ _ , release ] = await syncSemaphore . acquire ( ) ;
266+ if ( syncSemaphore . isLocked ( ) ) {
267+ logger . info ( 'Sync concurrency limit reached, waiting for lock' , { user_id : request . user_id } ) ;
268+ }
269+ const [ value , release ] = await syncSemaphore . acquire ( ) ;
265270 try {
271+ if ( value <= 3 ) {
272+ // This can be noisy, so we only log when we get close to the
273+ // concurrency limit.
274+ logger . info ( `Got sync lock. Slots available: ${ value - 1 } ` , {
275+ user_id : request . user_id ,
276+ sync_data_slots : value - 1
277+ } ) ;
278+ }
266279 // Optimization: Only fetch buckets for which the checksums have changed since the last checkpoint
267280 // For the first batch, this will be all buckets.
268281 const filteredBuckets = new Map ( bucketsToFetch . map ( ( bucket ) => [ bucket , dataBuckets . get ( bucket ) ! ] ) ) ;
@@ -330,6 +343,13 @@ async function* bucketDataBatch(request: BucketDataRequest): AsyncGenerator<Buck
330343 }
331344 }
332345 } finally {
346+ if ( value <= 3 ) {
347+ // This can be noisy, so we only log when we get close to the
348+ // concurrency limit.
349+ logger . info ( `Releasing sync lock` , {
350+ user_id : request . user_id
351+ } ) ;
352+ }
333353 release ( ) ;
334354 }
335355}
0 commit comments