Skip to content

Commit d3aa29a

Browse files
committed
Web: Fix reconnect
1 parent 50fbb95 commit d3aa29a

File tree

1 file changed

+62
-27
lines changed

1 file changed

+62
-27
lines changed

packages/powersync_core/lib/src/sync/streaming_sync.dart

Lines changed: 62 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,8 @@ class StreamingSyncImplementation implements StreamingSync {
528528
}
529529

530530
Future<http.StreamedResponse?> _postStreamRequest(
531-
Object? data, bool acceptBson) async {
531+
Object? data, bool acceptBson,
532+
{Future<void>? onAbort}) async {
532533
const ndJson = 'application/x-ndjson';
533534
const bson = 'application/vnd.powersync.bson-stream';
534535

@@ -538,8 +539,8 @@ class StreamingSyncImplementation implements StreamingSync {
538539
}
539540
final uri = credentials.endpointUri('sync/stream');
540541

541-
final request =
542-
http.AbortableRequest('POST', uri, abortTrigger: _abort!.onAbort);
542+
final request = http.AbortableRequest('POST', uri,
543+
abortTrigger: onAbort ?? _abort!.onAbort);
543544
request.headers['Content-Type'] = 'application/json';
544545
request.headers['Authorization'] = "Token ${credentials.token}";
545546
request.headers['Accept'] =
@@ -645,9 +646,10 @@ final class _ActiveRustStreamingIteration {
645646
}
646647
}
647648

648-
Stream<ReceivedLine> _receiveLines(Object? data) {
649+
Stream<ReceivedLine> _receiveLines(Object? data,
650+
{required Future<void> onAbort}) {
649651
return streamFromFutureAwaitInCancellation(
650-
sync._postStreamRequest(data, true))
652+
sync._postStreamRequest(data, true, onAbort: onAbort))
651653
.asyncExpand<Object /* Uint8List | String */ >((response) {
652654
if (response == null) {
653655
return null;
@@ -662,33 +664,66 @@ final class _ActiveRustStreamingIteration {
662664

663665
Future<RustSyncIterationResult> _handleLines(
664666
EstablishSyncStream request) async {
667+
// This is a workaround for https://github.com/dart-lang/http/issues/1820:
668+
// When cancelling the stream subscription of an HTTP response with the
669+
// fetch-based client implementation, cancelling the subscription is delayed
670+
// until the next chunk (typically a token_expires_in message in our case).
671+
// So, before cancelling, we complete an abort controller for the request to
672+
// speed things up. This is not an issue in most cases because the abort
673+
// controller on this stream would be completed when disconnecting. But
674+
// when switching sync streams, that's not the case and we need a second
675+
// abort controller for the inner iteration.
676+
final innerAbort = Completer<void>.sync();
665677
final events = addBroadcast(
666-
_receiveLines(request.request), sync._nonLineSyncEvents.stream);
678+
_receiveLines(
679+
request.request,
680+
onAbort: Future.any([
681+
sync._abort!.onAbort,
682+
innerAbort.future,
683+
]),
684+
),
685+
sync._nonLineSyncEvents.stream,
686+
);
667687

668688
var needsImmediateRestart = false;
669689
loop:
670-
await for (final event in events) {
671-
if (!_isActive || sync.aborted) {
672-
break;
673-
}
690+
try {
691+
await for (final event in events) {
692+
if (!_isActive || sync.aborted) {
693+
innerAbort.complete();
694+
break;
695+
}
674696

675-
switch (event) {
676-
case ReceivedLine(line: final Uint8List line):
677-
_triggerCrudUploadOnFirstLine();
678-
await _control('line_binary', line);
679-
case ReceivedLine(line: final line as String):
680-
_triggerCrudUploadOnFirstLine();
681-
await _control('line_text', line);
682-
case UploadCompleted():
683-
await _control('completed_upload');
684-
case AbortCurrentIteration(:final hideDisconnectState):
685-
needsImmediateRestart = hideDisconnectState;
686-
break loop;
687-
case TokenRefreshComplete():
688-
await _control('refreshed_token');
689-
case HandleChangedSubscriptions(:final currentSubscriptions):
690-
await _control('update_subscriptions',
691-
convert.json.encode(_encodeSubscriptions(currentSubscriptions)));
697+
switch (event) {
698+
case ReceivedLine(line: final Uint8List line):
699+
_triggerCrudUploadOnFirstLine();
700+
await _control('line_binary', line);
701+
case ReceivedLine(line: final line as String):
702+
_triggerCrudUploadOnFirstLine();
703+
await _control('line_text', line);
704+
case UploadCompleted():
705+
await _control('completed_upload');
706+
case AbortCurrentIteration(:final hideDisconnectState):
707+
innerAbort.complete();
708+
needsImmediateRestart = hideDisconnectState;
709+
break loop;
710+
case TokenRefreshComplete():
711+
await _control('refreshed_token');
712+
case HandleChangedSubscriptions(:final currentSubscriptions):
713+
await _control(
714+
'update_subscriptions',
715+
convert.json
716+
.encode(_encodeSubscriptions(currentSubscriptions)));
717+
}
718+
}
719+
} on http.RequestAbortedException {
720+
// Unlike a regular cancellation, cancelling via the abort controller
721+
// emits an error. We did mean to just cancel the stream, so we can
722+
// safely ignore that.
723+
if (innerAbort.isCompleted) {
724+
// ignore
725+
} else {
726+
rethrow;
692727
}
693728
}
694729

0 commit comments

Comments
 (0)