Skip to content

Sync streams #112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open

Sync streams #112

wants to merge 29 commits into from

Conversation

simolus3
Copy link
Contributor

@simolus3 simolus3 commented Jul 22, 2025

This adds support for sync streams to the core extension.

Local sync stream subscriptions are managed by invoking powersync_control('subscriptions', ...), where the second argument can be used to

  • subscribe to a stream, supplying its name, parameters to use for the subscription and optionally a TTL + priority.
  • unsubscribe from a stream by its name and parameters.

Stream subscriptions are stored in a new ps_stream_subscriptions table (with the stream's name and subscription parameters being the primary key). Rows are created

  • when a stream is subscribed to explicitly.
  • when we receive a stream from the sync service by default

It's also possible to explicitly subscribe to a default stream by not supplying any parameters - this is useful to e.g. override the priority of such a stream. Another reason for storing default streams is that they should be visible when a user requests a list of all streams. We also store whether a particular stream has been synced (and when it has last been synced).

When the SDK wants to connect, it supplies a list of all stream subscriptions that are currently (SDKs don't have to take TTL/expiry into consideration) active in the start command. The Rust client will automaticall increase the expiry date of active streams when receiving keepalive messages.
Because we expect the set of currently active stream subscription to potentially update very frequently (whenever subscribe or unsubscribe is called anywhere in the app, potentially on every navigation), the set of currently active subscriptions can be updated within a stream iteration. Only when the client detects that a TTL has expired or an new subscription not previously cached has been added would it request a new iteration.

The sync status interface is expanded to include information about all streams part of the current sync information. We also include all bucket names for each stream here, which is useful for client SDKs to compute sync progress for each stream.

Finally, there's a new powersync_offline_sync_status() function that returns the sync status with the initial hasSynced and lastSyncedAt fields for every stream and priority. Client SDKs can use this to replace the manual SQL query they have for that today.

@simolus3 simolus3 force-pushed the streams branch 2 times, most recently from 13f8550 to 0c63d69 Compare August 12, 2025 09:27
@simolus3 simolus3 marked this pull request as ready for review August 13, 2025 09:32
@simolus3 simolus3 requested a review from rkistner August 13, 2025 09:32
Copy link
Contributor

@rkistner rkistner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the SDK wants to connect, it supplies a list of all stream subscriptions that are currently (SDKs don't have to take TTL/expiry into consideration) active in the start command. The Rust client will automaticall increase the expiry date of active streams when receiving keepalive messages.

Could you explain how this relates tho the subscribe/unsubscribe commands? Is it supposed to be used in addition to subscribe/unsubscribe, or as an alternative? The way it looks to me, is that the client could have a sequence like this (simplified):

subscribe(A)
subscribe(B)
start(active_subscriptions: [A, B])
subscribe(C)
update_subscriptions([A, B, C])
unusubscribe(A)
update_subscriptions([B, C])

In this example it feels like the same subscription info is duplicated in the calls - not sure if I'm missing something here.

pub id: i64,
pub name: String,
pub parameters: Option<Box<JsonString>>,
pub associated_buckets: Vec<String>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if having this as part of the auto-reported status could have a performance impact in some cases. Specifically, if the user syncs a default stream with 10k buckets, each status update would be a 100kb+ in size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want this information on the sync status to be able to derive per-stream progress information (an aggregation we do in the SDKs).

So I think the easiest way to resolve this may be to encode changes as deltas instead of sending complete snapshots of the sync status every time. E.g. we could have an streams: /* unchanged */ marker on most updates, and only send streams once for the checkpoint.

If that approach sounds reasonable to you, I can take a look with a follow-up PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be it feasible to report the per-stream progress directly from the Rust SDK, instead of doing that aggregation in the SDKs?

Deltas could also help, depending on how often it changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of moving progress calculation entirely into the core extension. I've configured serde to skip serializing associated_buckets. Instead, we'll add a progress field on each stream subscription.

There was a similar issue serializing DownloadSyncStatus::downloading (which is a BTreeMap<String, BucketProgress> for each bucket). I've changed that to report one entry per progress instead, so client SDKs don't see buckets at all.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants