@@ -219,4 +219,45 @@ void main() {
219
219
var status = await stream.next;
220
220
expect (status.forStream (subscription), isNotNull);
221
221
});
222
+
223
+ test ('unsubscribing multiple times has no effect' , () async {
224
+ final a = await database.syncStream ('a' ).subscribe ();
225
+ final aAgain = await database.syncStream ('a' ).subscribe ();
226
+ a.unsubscribe ();
227
+ a.unsubscribe (); // Should not decrement the refcount again
228
+
229
+ // Pretend the streams are expired - they should still be requested because
230
+ // the core extension extends the lifetime of streams currently referenced
231
+ // before connecting.
232
+ await database.execute (
233
+ 'UPDATE ps_stream_subscriptions SET expires_at = unixepoch() - 1000' );
234
+
235
+ await waitForConnection ();
236
+ final request = await syncService.waitForListener;
237
+ expect (
238
+ json.decode (await request.readAsString ()),
239
+ containsPair (
240
+ 'streams' ,
241
+ containsPair ('subscriptions' , isNotEmpty),
242
+ ),
243
+ );
244
+ aAgain.unsubscribe ();
245
+ });
246
+
247
+ test ('unsubscribeAll' , () async {
248
+ final a = await database.syncStream ('a' ).subscribe ();
249
+ await database.syncStream ('a' ).unsubscribeAll ();
250
+
251
+ // Despite a being active, it should not be requested.
252
+ await waitForConnection ();
253
+ final request = await syncService.waitForListener;
254
+ expect (
255
+ json.decode (await request.readAsString ()),
256
+ containsPair (
257
+ 'streams' ,
258
+ containsPair ('subscriptions' , isEmpty),
259
+ ),
260
+ );
261
+ a.unsubscribe ();
262
+ });
222
263
}
0 commit comments