Skip to content

Commit 0908ecc

Browse files
committed
Update subscription state while offline
1 parent fe5969a commit 0908ecc

File tree

3 files changed

+32
-21
lines changed

3 files changed

+32
-21
lines changed

packages/powersync_core/lib/src/database/powersync_db_mixin.dart

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import 'dart:async';
2-
import 'dart:convert';
32

43
import 'package:async/async.dart';
54
import 'package:logging/logging.dart';
@@ -16,8 +15,6 @@ import 'package:powersync_core/src/schema.dart';
1615
import 'package:powersync_core/src/schema_logic.dart';
1716
import 'package:powersync_core/src/schema_logic.dart' as schema_logic;
1817
import 'package:powersync_core/src/sync/connection_manager.dart';
19-
import 'package:powersync_core/src/sync/instruction.dart';
20-
import 'package:powersync_core/src/sync/mutable_sync_status.dart';
2118
import 'package:powersync_core/src/sync/options.dart';
2219
import 'package:powersync_core/src/sync/sync_status.dart';
2320

@@ -109,7 +106,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
109106
await _checkVersion();
110107
await database.execute('SELECT powersync_init()');
111108
await updateSchema(schema);
112-
await _updateHasSynced();
109+
await _connections.resolveOfflineSyncStatus();
113110
}
114111

115112
/// Check that a supported version of the powersync extension is loaded.
@@ -135,23 +132,6 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
135132
return isInitialized;
136133
}
137134

138-
Future<List<SyncStream>> get subscribedStreams {
139-
throw UnimplementedError();
140-
}
141-
142-
Future<void> _updateHasSynced() async {
143-
// Query the database to see if any data has been synced.
144-
final row = await database.get(
145-
'SELECT powersync_offline_sync_status() AS r;',
146-
);
147-
148-
final status = CoreSyncStatus.fromJson(
149-
json.decode(row['r'] as String) as Map<String, Object?>);
150-
151-
setStatus((MutableSyncStatus()..applyFromCore(status))
152-
.immutableSnapshot(setLastSynced: true));
153-
}
154-
155135
/// Returns a [Future] which will resolve once at least one full sync cycle
156136
/// has completed (meaninng that the first consistent checkpoint has been
157137
/// reached across all buckets).

packages/powersync_core/lib/src/sync/connection_manager.dart

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import 'package:powersync_core/src/sync/options.dart';
1010
import 'package:powersync_core/src/sync/stream.dart';
1111
import 'package:powersync_core/src/sync/sync_status.dart';
1212

13+
import 'instruction.dart';
14+
import 'mutable_sync_status.dart';
1315
import 'streaming_sync.dart';
1416

1517
/// A (stream name, JSON parameters) pair that uniquely identifies a stream
@@ -252,6 +254,15 @@ final class ConnectionManager {
252254
'priority': priority,
253255
},
254256
});
257+
258+
await _activeGroup.syncMutex.lock(() async {
259+
if (_abortActiveSync == null) {
260+
// Since we're not connected, update the offline sync status to reflect
261+
// the new subscription.
262+
// With a connection, the sync client would include it in its state.
263+
await resolveOfflineSyncStatus();
264+
}
265+
});
255266
}
256267

257268
Future<void> unsubscribeAll({
@@ -266,6 +277,18 @@ final class ConnectionManager {
266277
});
267278
}
268279

280+
Future<void> resolveOfflineSyncStatus() async {
281+
final row = await db.database.get(
282+
'SELECT powersync_offline_sync_status() AS r;',
283+
);
284+
285+
final status = CoreSyncStatus.fromJson(
286+
json.decode(row['r'] as String) as Map<String, Object?>);
287+
288+
manuallyChangeSyncStatus((MutableSyncStatus()..applyFromCore(status))
289+
.immutableSnapshot(setLastSynced: true));
290+
}
291+
269292
SyncStream syncStream(String name, Map<String, Object?>? parameters) {
270293
return _SyncStreamImplementation(this, name, parameters);
271294
}

packages/powersync_core/test/sync/stream_test.dart

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,4 +211,12 @@ void main() {
211211
await pumpEventQueue();
212212
expect(syncService.controller.hasListener, isTrue);
213213
});
214+
215+
test('subscriptions update while offline', () async {
216+
final stream = StreamQueue(database.statusStream);
217+
218+
final subscription = await database.syncStream('foo').subscribe();
219+
var status = await stream.next;
220+
expect(status.statusFor(subscription), isNotNull);
221+
});
214222
}

0 commit comments

Comments
 (0)