Skip to content

Commit 9319400

Browse files
committed
Properly update subscriptions
1 parent 7b0220c commit 9319400

File tree

6 files changed

+62
-44
lines changed

6 files changed

+62
-44
lines changed

packages/powersync_core/lib/src/database/powersync_db_mixin.dart

Lines changed: 11 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import 'dart:async';
2+
import 'dart:convert';
23

34
import 'package:async/async.dart';
45
import 'package:logging/logging.dart';
@@ -15,6 +16,8 @@ import 'package:powersync_core/src/schema.dart';
1516
import 'package:powersync_core/src/schema_logic.dart';
1617
import 'package:powersync_core/src/schema_logic.dart' as schema_logic;
1718
import 'package:powersync_core/src/sync/connection_manager.dart';
19+
import 'package:powersync_core/src/sync/instruction.dart';
20+
import 'package:powersync_core/src/sync/mutable_sync_status.dart';
1821
import 'package:powersync_core/src/sync/options.dart';
1922
import 'package:powersync_core/src/sync/sync_status.dart';
2023

@@ -138,53 +141,26 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
138141

139142
Future<void> _updateHasSynced() async {
140143
// Query the database to see if any data has been synced.
141-
final result = await database.getAll(
142-
'SELECT priority, last_synced_at FROM ps_sync_state ORDER BY priority;',
144+
final row = await database.get(
145+
'SELECT powersync_offline_sync_status() AS r;',
143146
);
144-
const prioritySentinel = 2147483647;
145-
var hasSynced = false;
146-
DateTime? lastCompleteSync;
147-
final priorityStatusEntries = <SyncPriorityStatus>[];
148147

149-
DateTime parseDateTime(String sql) {
150-
return DateTime.parse('${sql}Z').toLocal();
151-
}
152-
153-
for (final row in result) {
154-
final priority = row.columnAt(0) as int;
155-
final lastSyncedAt = parseDateTime(row.columnAt(1) as String);
156-
157-
if (priority == prioritySentinel) {
158-
hasSynced = true;
159-
lastCompleteSync = lastSyncedAt;
160-
} else {
161-
priorityStatusEntries.add((
162-
hasSynced: true,
163-
lastSyncedAt: lastSyncedAt,
164-
priority: BucketPriority(priority)
165-
));
166-
}
167-
}
148+
final status = CoreSyncStatus.fromJson(
149+
json.decode(row['r'] as String) as Map<String, Object?>);
168150

169-
if (hasSynced != currentStatus.hasSynced) {
170-
final status = SyncStatus(
171-
hasSynced: hasSynced,
172-
lastSyncedAt: lastCompleteSync,
173-
priorityStatusEntries: priorityStatusEntries,
174-
);
175-
setStatus(status);
176-
}
151+
setStatus((MutableSyncStatus()..applyFromCore(status))
152+
.immutableSnapshot(setLastSynced: true));
177153
}
178154

179155
/// Returns a [Future] which will resolve once at least one full sync cycle
180156
/// has completed (meaninng that the first consistent checkpoint has been
181157
/// reached across all buckets).
182158
///
183159
/// When [priority] is null (the default), this method waits for the first
184-
/// full sync checkpoint to complete. When set to a [BucketPriority] however,
160+
/// full sync checkpoint to complete. When set to a [StreamPriority] however,
185161
/// it completes once all buckets within that priority (as well as those in
186162
/// higher priorities) have been synchronized at least once.
187-
Future<void> waitForFirstSync({BucketPriority? priority}) async {
163+
Future<void> waitForFirstSync({StreamPriority? priority}) async {
188164
bool matches(SyncStatus status) {
189165
if (priority == null) {
190166
return status.hasSynced == true;

packages/powersync_core/lib/src/sync/connection_manager.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ final class ConnectionManager {
197197
// would be equal and don't require an event. So, check again.
198198
if (newStatus != currentStatus) {
199199
_currentStatus = newStatus;
200-
_statusController.add(currentStatus);
200+
_statusController.add(_currentStatus);
201201
}
202202
}
203203
}

packages/powersync_core/lib/src/sync/instruction.dart

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,8 @@ final class CoreSyncStatus {
8585
null => null,
8686
final raw as Map<String, Object?> => DownloadProgress.fromJson(raw),
8787
},
88-
streams: (json['streams'] as List<Object?>?)
89-
?.map((e) =>
88+
streams: (json['streams'] as List<Object?>)
89+
.map((e) =>
9090
CoreActiveStreamSubscription.fromJson(e as Map<String, Object?>))
9191
.toList(),
9292
);

packages/powersync_core/lib/src/sync/mutable_sync_status.dart

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ final class MutableSyncStatus {
9797
streams = status.streams;
9898
}
9999

100-
SyncStatus immutableSnapshot() {
100+
SyncStatus immutableSnapshot({bool setLastSynced = false}) {
101101
return SyncStatus(
102102
connected: connected,
103103
connecting: connecting,
@@ -106,7 +106,7 @@ final class MutableSyncStatus {
106106
downloadProgress: downloadProgress?.asSyncDownloadProgress,
107107
priorityStatusEntries: UnmodifiableListView(priorityStatusEntries),
108108
lastSyncedAt: lastSyncedAt,
109-
hasSynced: null, // Stream client is not supposed to set this value.
109+
hasSynced: setLastSynced ? lastSyncedAt != null : null,
110110
uploadError: uploadError,
111111
downloadError: downloadError,
112112
streamSubscriptions: streams,

packages/powersync_core/lib/src/sync/streaming_sync.dart

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ class StreamingSyncImplementation implements StreamingSync {
135135
void updateSubscriptions(List<SubscribedStream> streams) {
136136
_activeSubscriptions = streams;
137137
if (_nonLineSyncEvents.hasListener) {
138-
_nonLineSyncEvents.add(const AbortCurrentIteration());
138+
_nonLineSyncEvents.add(HandleChangedSubscriptions(streams));
139139
}
140140
}
141141

@@ -464,6 +464,7 @@ class StreamingSyncImplementation implements StreamingSync {
464464
_state.updateStatus((s) => s.setConnected());
465465
await handleLine(line as StreamingSyncLine);
466466
case UploadCompleted():
467+
case HandleChangedSubscriptions():
467468
// Only relevant for the Rust sync implementation.
468469
break;
469470
case AbortCurrentIteration():
@@ -613,6 +614,12 @@ final class _ActiveRustStreamingIteration {
613614

614615
_ActiveRustStreamingIteration(this.sync);
615616

617+
List<Object?> _encodeSubscriptions(List<SubscribedStream> subscriptions) {
618+
return sync._activeSubscriptions
619+
.map((s) => {'name': s.name, 'params': s.parameters})
620+
.toList();
621+
}
622+
616623
Future<void> syncIteration() async {
617624
try {
618625
await _control(
@@ -621,9 +628,7 @@ final class _ActiveRustStreamingIteration {
621628
'parameters': sync.options.params,
622629
'schema': convert.json.decode(sync.schemaJson),
623630
'include_defaults': sync.options.includeDefaultStreams,
624-
'active_streams': sync._activeSubscriptions
625-
.map((s) => {'name': s.name, 'params': s.parameters})
626-
.toList(),
631+
'active_streams': _encodeSubscriptions(sync._activeSubscriptions),
627632
}),
628633
);
629634
assert(_completedStream.isCompleted, 'Should have started streaming');
@@ -673,6 +678,9 @@ final class _ActiveRustStreamingIteration {
673678
break loop;
674679
case TokenRefreshComplete():
675680
await _control('refreshed_token');
681+
case HandleChangedSubscriptions(:final currentSubscriptions):
682+
await _control('update_subscriptions',
683+
convert.json.encode(_encodeSubscriptions(currentSubscriptions)));
676684
}
677685
}
678686
}
@@ -762,3 +770,9 @@ final class TokenRefreshComplete implements SyncEvent {
762770
final class AbortCurrentIteration implements SyncEvent {
763771
const AbortCurrentIteration();
764772
}
773+
774+
final class HandleChangedSubscriptions implements SyncEvent {
775+
final List<SubscribedStream> currentSubscriptions;
776+
777+
HandleChangedSubscriptions(this.currentSubscriptions);
778+
}

packages/powersync_core/test/sync/stream_test.dart

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,4 +183,32 @@ void main() {
183183
),
184184
);
185185
});
186+
187+
test('changes subscriptions dynamically', () async {
188+
await waitForConnection();
189+
syncService.addKeepAlive();
190+
191+
final subscription = await database.syncStream('a').subscribe();
192+
syncService.endCurrentListener();
193+
final request = await syncService.waitForListener;
194+
expect(
195+
json.decode(await request.readAsString()),
196+
containsPair(
197+
'streams',
198+
containsPair('subscriptions', [
199+
{
200+
'stream': 'a',
201+
'parameters': null,
202+
'override_priority': null,
203+
},
204+
]),
205+
),
206+
);
207+
208+
// Given that the subscription has a TTL, dropping the handle should not
209+
// re-subscribe.
210+
await subscription.unsubscribe();
211+
await pumpEventQueue();
212+
expect(syncService.controller.hasListener, isTrue);
213+
});
186214
}

0 commit comments

Comments
 (0)