@@ -54,10 +54,13 @@ export abstract class AbstractStreamingSyncImplementation extends BaseObserver<S
5454
5555 private isUploadingCrud : boolean ;
5656
57+ protected _isConnected : boolean ;
58+
5759 constructor ( options : AbstractStreamingSyncImplementationOptions ) {
5860 super ( ) ;
5961 this . options = { ...DEFAULT_STREAMING_SYNC_OPTIONS , ...options } ;
6062 this . isUploadingCrud = false ;
63+ this . _isConnected = false ;
6164 }
6265
6366 get lastSyncedAt ( ) {
@@ -68,6 +71,10 @@ export abstract class AbstractStreamingSyncImplementation extends BaseObserver<S
6871 return this . options . logger ! ;
6972 }
7073
74+ get isConnected ( ) {
75+ return this . _isConnected ;
76+ }
77+
7178 abstract obtainLock < T > ( lockOptions : LockOptions < T > ) : Promise < T > ;
7279
7380 async hasCompletedSync ( ) {
@@ -163,6 +170,9 @@ export abstract class AbstractStreamingSyncImplementation extends BaseObserver<S
163170 } ,
164171 signal
165172 ) ) {
173+ // A connection is active and messages are being received
174+ this . updateSyncStatus ( true ) ;
175+
166176 if ( isStreamingSyncCheckpoint ( line ) ) {
167177 targetCheckpoint = line . checkpoint ;
168178 const bucketsToDelete = new Set < string > ( bucketSet ) ;
@@ -289,7 +299,13 @@ export abstract class AbstractStreamingSyncImplementation extends BaseObserver<S
289299 }
290300
291301 private updateSyncStatus ( connected : boolean , lastSyncedAt ?: Date ) {
302+ const takeSnapShot = ( ) => [ this . _isConnected , this . _lastSyncedAt ?. valueOf ( ) ] ;
303+
304+ const previousValues = takeSnapShot ( ) ;
292305 this . _lastSyncedAt = lastSyncedAt ?? this . lastSyncedAt ;
293- this . iterateListeners ( ( cb ) => cb . statusChanged ?.( new SyncStatus ( connected , this . lastSyncedAt ) ) ) ;
306+ this . _isConnected = connected ;
307+ if ( ! _ . isEqual ( previousValues , takeSnapShot ( ) ) ) {
308+ this . iterateListeners ( ( cb ) => cb . statusChanged ?.( new SyncStatus ( this . isConnected , this . lastSyncedAt ) ) ) ;
309+ }
294310 }
295311}
0 commit comments