Skip to content

Commit ba96ccf

Browse files
author
Ivan Dlugos
committed
Sync - make sure Observers (query stream) and SyncClient can't be used at the same time
1 parent 7f9563b commit ba96ccf

File tree

5 files changed

+54
-6
lines changed

5 files changed

+54
-6
lines changed

example/flutter/objectbox_demo_sync/lib/main.dart

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,11 @@ class ViewModel {
7777

7878
void removeNote(Note note) => _box.remove(note.id);
7979

80-
Stream<List<Note>> get queryStream => _query.findStream();
80+
// Note: using query.findStream() and sync.client() in the same app is
81+
// currently not supported so this app is currently not working and only
82+
// servers as an example on how and when to start a sync client.
83+
// Stream<List<Note>> get queryStream => _query.findStream();
84+
Stream<List<Note>> get queryStream => Stream<List<Note>>.empty();
8185

8286
List<Note> get allNotes => _query.find();
8387

lib/src/observable.dart

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ class _Observable {
3535
}
3636

3737
static void subscribe(Store store) {
38+
syncOrObserversExclusive.mark(store);
39+
3840
final callback = Pointer.fromFunction<obx_observer_t>(_anyCallback);
3941
final storePtr = store.ptr;
4042
_anyObserver[storePtr.address] =
@@ -53,18 +55,19 @@ class _Observable {
5355
StoreCloseObserver.removeListener(store, _anyObserver[storeAddress]);
5456
bindings.obx_observer_close(_anyObserver[storeAddress]);
5557
_anyObserver.remove(storeAddress);
58+
syncOrObserversExclusive.unmark(store);
5659
}
60+
61+
static bool isSubscribed(Store store) =>
62+
_Observable._anyObserver.containsKey(store.ptr.address);
5763
}
5864

5965
extension Streamable<T> on Query<T> {
6066
void _setup() {
61-
final storePtr = store.ptr;
62-
63-
if (!_Observable._anyObserver.containsKey(storePtr)) {
67+
if (!_Observable.isSubscribed(store)) {
6468
_Observable.subscribe(store);
6569
}
66-
67-
final storeAddress = storePtr.address;
70+
final storeAddress = store.ptr.address;
6871

6972
_Observable._any[storeAddress] ??= <int, Any>{};
7073
_Observable._any[storeAddress][entityId] ??= (u, _, __) {

lib/src/sync.dart

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ class SyncClient {
101101
_cSync = nullptr;
102102
SyncClientsStorage.remove(_store);
103103
StoreCloseObserver.removeListener(_store, this);
104+
syncOrObserversExclusive.unmark(_store);
104105
checkObx(err);
105106
}
106107

@@ -235,6 +236,7 @@ class Sync {
235236
if (SyncClientsStorage.containsKey(store)) {
236237
throw Exception('Only one sync client can be active for a store');
237238
}
239+
syncOrObserversExclusive.mark(store);
238240
final client = SyncClient(store, serverUri, creds);
239241
SyncClientsStorage[store] = client;
240242
StoreCloseObserver.addListener(store, client, client.close);

lib/src/util.dart

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,23 @@ class StoreCloseObserver {
4242

4343
/// Global internal storage of sync clients - one client per store.
4444
final Map<Store, SyncClient> SyncClientsStorage = {};
45+
46+
// Currently, either SyncClient or Observers can be used at the same time.
47+
// TODO: lift this condition after #142 is fixed.
48+
class SyncOrObserversExclusive {
49+
final _map = <Store, bool>{};
50+
51+
void mark(Store store) {
52+
if (_map.containsKey(store)) {
53+
throw Exception(
54+
'Using observers/query streams in combination with SyncClient is currently not supported');
55+
}
56+
_map[store] = true;
57+
}
58+
59+
void unmark(Store store) {
60+
_map.remove(store);
61+
}
62+
}
63+
64+
final syncOrObserversExclusive = SyncOrObserversExclusive();

test/sync_test.dart

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import 'dart:typed_data';
33

44
import 'package:test/test.dart';
55
import 'package:objectbox/objectbox.dart';
6+
import 'package:objectbox/observable.dart';
67
import 'package:objectbox/src/bindings/constants.dart';
78

89
import 'entity.dart';
@@ -56,6 +57,24 @@ void main() {
5657
if (Sync.isAvailable()) {
5758
// TESTS to run when SYNC is available
5859

60+
group('Circumvent issue #142 - async callbacks error', () {
61+
final error = throwsA(predicate((Exception e) => e.toString().contains(
62+
'Using observers/query streams in combination with SyncClient is currently not supported')));
63+
64+
test('Must not start an Observer when SyncClient is active', () {
65+
createClient(store);
66+
expect(() => env.box.query().build().findStream(), error);
67+
});
68+
69+
test('Must not start SyncClient when an Observer is active', () {
70+
final error = throwsA(predicate((Exception e) => e.toString().contains(
71+
'Using observers/query streams in combination with SyncClient is currently not supported')));
72+
73+
SyncClient c = createClient(store);
74+
expect(() => env.box.query().build().findStream(), error);
75+
});
76+
});
77+
5978
test('SyncClient lifecycle', () {
6079
expect(store.syncClient(), isNull);
6180

0 commit comments

Comments
 (0)