Skip to content

Commit 77743cc

Browse files
authored
Merge pull request #64 from powersync-ja/client-id
Support client_id and User-Agent
2 parents 373924d + 44f907e commit 77743cc

File tree

11 files changed

+45
-7
lines changed

11 files changed

+45
-7
lines changed

.changeset/rude-timers-matter.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@powersync/service-core': minor
3+
'@powersync/service-image': minor
4+
---
5+
6+
Support client_id parameter and User-Agent headers.

packages/service-core/src/routes/configure-rsocket.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ export function configureRSocket(router: ReactiveSocketRouter<Context>, options:
2323

2424
router.applyWebSocketEndpoints(server, {
2525
contextProvider: async (data: Buffer) => {
26-
const { token } = RSocketContextMeta.decode(deserialize(data) as any);
26+
const { token, user_agent } = RSocketContextMeta.decode(deserialize(data) as any);
2727

2828
if (!token) {
2929
throw new errors.AuthorizationError('No token provided');
@@ -38,6 +38,7 @@ export function configureRSocket(router: ReactiveSocketRouter<Context>, options:
3838
}
3939
return {
4040
token,
41+
user_agent,
4142
...context,
4243
token_errors: token_errors,
4344
system

packages/service-core/src/routes/endpoints/checkpointing.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ import * as util from '../../util/util-index.js';
55
import { authUser } from '../auth.js';
66
import { routeDefinition } from '../router.js';
77

8-
const WriteCheckpointRequest = t.object({});
8+
const WriteCheckpointRequest = t.object({
9+
client_id: t.string.optional()
10+
});
911

1012
export const writeCheckpoint = routeDefinition({
1113
path: '/write-checkpoint.json',
@@ -30,8 +32,10 @@ export const writeCheckpoint2 = routeDefinition({
3032
validator: schema.createTsCodecValidator(WriteCheckpointRequest, { allowAdditional: true }),
3133
handler: async (payload) => {
3234
const { user_id, system } = payload.context;
35+
const client_id = payload.params.client_id;
36+
const full_user_id = util.checkpointUserId(user_id, client_id);
3337
const storage = system.storage;
34-
const write_checkpoint = await util.createWriteCheckpoint(system.requirePgPool(), storage, user_id!);
38+
const write_checkpoint = await util.createWriteCheckpoint(system.requirePgPool(), storage, full_user_id);
3539
return {
3640
write_checkpoint: String(write_checkpoint)
3741
};

packages/service-core/src/routes/endpoints/socket-route.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
124124
disposer();
125125
logger.info(`Sync stream complete`, {
126126
user_id: syncParams.user_id,
127+
user_agent: context.user_agent,
127128
operations_synced: tracker.operationsSynced,
128129
data_synced_bytes: tracker.dataSyncedBytes
129130
});

packages/service-core/src/routes/endpoints/sync-stream.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ export const syncStreamed = routeDefinition({
2121
validator: schema.createTsCodecValidator(util.StreamingSyncRequest, { allowAdditional: true }),
2222
handler: async (payload) => {
2323
const system = payload.context.system;
24+
const headers = payload.request.headers;
25+
const userAgent = headers['x-user-agent'] ?? headers['user-agent'];
26+
const clientId = payload.params.client_id;
2427

2528
if (system.closed) {
2629
throw new errors.JourneyError({
@@ -92,6 +95,8 @@ export const syncStreamed = routeDefinition({
9295
Metrics.getInstance().concurrent_connections.add(-1);
9396
logger.info(`Sync stream complete`, {
9497
user_id: syncParams.user_id,
98+
client_id: clientId,
99+
user_agent: userAgent,
95100
operations_synced: tracker.operationsSynced,
96101
data_synced_bytes: tracker.dataSyncedBytes
97102
});

packages/service-core/src/routes/router-socket.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,6 @@ import { Context } from './router.js';
99
export type SocketRouteGenerator = (router: ReactiveSocketRouter<Context>) => IReactiveStream;
1010

1111
export const RSocketContextMeta = t.object({
12-
token: t.string
12+
token: t.string,
13+
user_agent: t.string.optional()
1314
});

packages/service-core/src/routes/router.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ export type Context = {
1111

1212
token_payload?: auth.JwtPayload;
1313
token_errors?: string[];
14+
/**
15+
* Only on websocket endpoints.
16+
*/
17+
user_agent?: string;
1418
};
1519

1620
export type BasicRouterRequest = {

packages/service-core/src/sync/sync.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ async function* streamResponseInner(
9393
}
9494
}
9595

96-
const stream = storage.watchWriteCheckpoint(syncParams.token_parameters.user_id as string, signal);
96+
const checkpointUserId = util.checkpointUserId(syncParams.token_parameters.user_id as string, params.client_id);
97+
const stream = storage.watchWriteCheckpoint(checkpointUserId, signal);
9798
for await (const next of stream) {
9899
const { base, writeCheckpoint } = next;
99100
const checkpoint = base.checkpoint;

packages/service-core/src/util/protocol-types.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,12 @@ export const StreamingSyncRequest = t.object({
9494
/**
9595
* Client parameters to be passed to the sync rules.
9696
*/
97-
parameters: t.record(t.any).optional()
97+
parameters: t.record(t.any).optional(),
98+
99+
/**
100+
* Unique client id.
101+
*/
102+
client_id: t.string.optional()
98103
});
99104

100105
export type StreamingSyncRequest = t.Decoded<typeof StreamingSyncRequest>;

packages/service-core/src/util/utils.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,3 +120,13 @@ export async function createWriteCheckpoint(
120120
logger.info(`Write checkpoint 2: ${JSON.stringify({ lsn, id: String(id) })}`);
121121
return id;
122122
}
123+
124+
export function checkpointUserId(user_id: string | undefined, client_id: string | undefined) {
125+
if (user_id == null) {
126+
throw new Error('user_id is required');
127+
}
128+
if (client_id == null) {
129+
return user_id;
130+
}
131+
return `${user_id}/${client_id}`;
132+
}

0 commit comments

Comments
 (0)