From 0e900f0fd1d5a5053927c8fa9ee9dac267accb4c Mon Sep 17 00:00:00 2001 From: Eduardo Gurgel Pinho Date: Tue, 25 Nov 2025 12:53:46 +1300 Subject: [PATCH 1/2] feat: nodejs integration tests --- .github/workflows/integration_tests.yml | 4 +- docker-compose.tests.yml | 29 +++- test/integration/{ => deno}/tests.ts | 0 test/integration/node/package.json | 14 ++ test/integration/node/tests.js | 172 ++++++++++++++++++++++++ 5 files changed, 215 insertions(+), 4 deletions(-) rename test/integration/{ => deno}/tests.ts (100%) create mode 100644 test/integration/node/package.json create mode 100644 test/integration/node/tests.js diff --git a/.github/workflows/integration_tests.yml b/.github/workflows/integration_tests.yml index 3981300df..26e09bc3f 100644 --- a/.github/workflows/integration_tests.yml +++ b/.github/workflows/integration_tests.yml @@ -29,5 +29,7 @@ jobs: steps: - uses: actions/checkout@v2 - name: Run integration test - run: docker compose -f docker-compose.tests.yml up --abort-on-container-exit --exit-code-from test-runner + run: | + docker compose -f docker-compose.tests.yml up --abort-on-container-exit --exit-code-from deno-test-runner deno-test-runner + docker compose -f docker-compose.tests.yml up --abort-on-container-exit --exit-code-from node-test-runner node-test-runner diff --git a/docker-compose.tests.yml b/docker-compose.tests.yml index 56f5466e8..2f9b7c1cb 100644 --- a/docker-compose.tests.yml +++ b/docker-compose.tests.yml @@ -56,8 +56,7 @@ services: retries: 5 start_period: 5s - # Deno test runner - test-runner: + deno-test-runner: image: denoland/deno:alpine-2.5.6 container_name: deno-test-runner depends_on: @@ -66,7 +65,7 @@ services: test_db: condition: service_healthy volumes: - - ./test/integration/tests.ts:/app/tests.ts:ro + - ./test/integration/deno/tests.ts:/app/tests.ts:ro working_dir: /app command: > sh -c " @@ -78,6 +77,30 @@ services: extra_hosts: - "realtime-dev.localhost:host-gateway" + node-test-runner: + image: node:22-alpine + container_name: node-test-runner + depends_on: + test_realtime: + condition: service_healthy + test_db: + condition: service_healthy + volumes: + - ./test/integration/node/tests.js:/app/tests.js:ro + - ./test/integration/node/package.json:/app/package.json:ro + working_dir: /app + command: > + sh -c " + echo 'Installing dependencies...' && + npm install && + echo 'Running tests...' && + npm test + " + networks: + - test-network + extra_hosts: + - "realtime-dev.localhost:host-gateway" + networks: test-network: driver: bridge diff --git a/test/integration/tests.ts b/test/integration/deno/tests.ts similarity index 100% rename from test/integration/tests.ts rename to test/integration/deno/tests.ts diff --git a/test/integration/node/package.json b/test/integration/node/package.json new file mode 100644 index 000000000..bfdde4604 --- /dev/null +++ b/test/integration/node/package.json @@ -0,0 +1,14 @@ +{ + "name": "realtime-integration-tests", + "version": "1.0.0", + "type": "module", + "scripts": { + "test": "mocha tests.js" + }, + "dependencies": { + "@supabase/supabase-js": "^2.39.0" + }, + "devDependencies": { + "mocha": "^10.2.0" + } +} diff --git a/test/integration/node/tests.js b/test/integration/node/tests.js new file mode 100644 index 000000000..1c8ba32af --- /dev/null +++ b/test/integration/node/tests.js @@ -0,0 +1,172 @@ +import { RealtimeClient } from "@supabase/supabase-js"; +import { describe, it } from "mocha"; +import { strict as assert } from "assert"; + +const sleep = (seconds) => new Promise(resolve => setTimeout(resolve, seconds * 1000)); + +const withDeadline = (fn, ms) => { + return async function(...args) { + return Promise.race([ + fn(...args), + new Promise((_, reject) => + setTimeout(() => reject(new Error(`Test timed out after ${ms}ms`)), ms) + ) + ]); + }; +}; + +const url = "http://realtime-dev.localhost:4100/socket"; +const serviceRoleKey = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjIwNzU3NzYzODIsInJlZiI6IjEyNy4wLjAuMSIsInJvbGUiOiJzZXJ2aWNlX3JvbGUiLCJpYXQiOjE3NjA3NzYzODJ9.nupH8pnrOTgK9Xaq8-D4Ry-yQ-PnlXEagTVywQUJVIE"; +const apiKey = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjIwNzU2NjE3MjEsInJlZiI6IjEyNy4wLjAuMSIsInJvbGUiOiJhdXRoZW50aWNhdGVkIiwiaWF0IjoxNzYwNjYxNzIxfQ.PxpBoelC9vWQ2OVhmwKBUDEIKgX7MpgSdsnmXw7UdYk"; + +const realtimeV1 = { vsn: '1.0.0', params: { apikey: apiKey }, heartbeatIntervalMs: 5000, timeout: 5000 }; +const realtimeV2 = { vsn: '2.0.0', params: { apikey: apiKey }, heartbeatIntervalMs: 5000, timeout: 5000 }; +const realtimeServiceRole = { vsn: '2.0.0', logger: console.log, params: { apikey: serviceRoleKey }, heartbeatIntervalMs: 5000, timeout: 5000 }; + +let clientV1 = null; +let clientV2 = null; + +describe("broadcast extension", function() { + // Increase timeout for all tests in this suite + this.timeout(10000); + + it("users with different versions can receive self broadcast", withDeadline(async () => { + clientV1 = new RealtimeClient(url, realtimeV1); + clientV2 = new RealtimeClient(url, realtimeV2); + let resultV1 = null; + let resultV2 = null; + let event = crypto.randomUUID(); + let topic = "topic:" + crypto.randomUUID(); + let expectedPayload = { message: crypto.randomUUID() }; + const config = { config: { broadcast: { ack: true, self: true } } }; + + const channelV1 = clientV1 + .channel(topic, config) + .on("broadcast", { event }, ({ payload }) => (resultV1 = payload)) + .subscribe(); + + const channelV2 = clientV2 + .channel(topic, config) + .on("broadcast", { event }, ({ payload }) => (resultV2 = payload)) + .subscribe(); + + while (channelV1.state !== "joined" || channelV2.state !== "joined") await sleep(0.2); + + // Send from V1 client - both should receive + await channelV1.send({ + type: "broadcast", + event, + payload: expectedPayload, + }); + + while (resultV1 == null || resultV2 == null) await sleep(0.2); + + assert.deepEqual(resultV1, expectedPayload); + assert.deepEqual(resultV2, expectedPayload); + + // Reset results for second test + resultV1 = null; + resultV2 = null; + let expectedPayload2 = { message: crypto.randomUUID() }; + + // Send from V2 client - both should receive + await channelV2.send({ + type: "broadcast", + event, + payload: expectedPayload2, + }); + + while (resultV1 == null || resultV2 == null) await sleep(0.2); + + assert.deepEqual(resultV1, expectedPayload2); + assert.deepEqual(resultV2, expectedPayload2); + + await channelV1.unsubscribe(); + await channelV2.unsubscribe(); + + await stopClient(clientV1); + await stopClient(clientV2); + clientV1 = null; + clientV2 = null; + }, 5000)); + + it("v2 can send/receive binary payload", withDeadline(async () => { + clientV2 = new RealtimeClient(url, realtimeV2); + let result = null; + let event = crypto.randomUUID(); + let topic = "topic:" + crypto.randomUUID(); + const expectedPayload = new ArrayBuffer(2); + const uint8 = new Uint8Array(expectedPayload); + uint8[0] = 125; + uint8[1] = 255; + + const config = { config: { broadcast: { ack: true, self: true } } }; + + const channelV2 = clientV2 + .channel(topic, config) + .on("broadcast", { event }, ({ payload }) => (result = payload)) + .subscribe(); + + while (channelV2.state !== "joined") await sleep(0.2); + + await channelV2.send({ + type: "broadcast", + event, + payload: expectedPayload, + }); + + while (result == null) await sleep(0.2); + + assert.deepEqual(result, expectedPayload); + + await channelV2.unsubscribe(); + + await stopClient(clientV2); + clientV2 = null; + }, 5000)); + + it("users with different versions can receive broadcasts from endpoint", withDeadline(async () => { + clientV1 = new RealtimeClient(url, realtimeV1); + clientV2 = new RealtimeClient(url, realtimeV2); + let resultV1 = null; + let resultV2 = null; + let event = crypto.randomUUID(); + let topic = "topic:" + crypto.randomUUID(); + let expectedPayload = { message: crypto.randomUUID() }; + const config = { config: { broadcast: { ack: true, self: true } } }; + + const channelV1 = clientV1 + .channel(topic, config) + .on("broadcast", { event }, ({ payload }) => (resultV1 = payload)) + .subscribe(); + + const channelV2 = clientV2 + .channel(topic, config) + .on("broadcast", { event }, ({ payload }) => (resultV2 = payload)) + .subscribe(); + + while (channelV1.state !== "joined" || channelV2.state !== "joined") await sleep(0.2); + + // Send from unsubscribed channel - both should receive + new RealtimeClient(url, realtimeServiceRole).channel(topic, config).httpSend(event, expectedPayload); + + while (resultV1 == null || resultV2 == null) await sleep(0.2); + + assert.deepEqual(resultV1, expectedPayload); + assert.deepEqual(resultV2, expectedPayload); + + await channelV1.unsubscribe(); + await channelV2.unsubscribe(); + + await stopClient(clientV1); + await stopClient(clientV2); + clientV1 = null; + clientV2 = null; + }, 5000)); +}); + +async function stopClient(client) { + if (client) { + await client.removeAllChannels(); + } +} From 02165bcf869137f1bd704fc9ffebb7f0a5eea1b8 Mon Sep 17 00:00:00 2001 From: Eduardo Gurgel Pinho Date: Tue, 25 Nov 2025 16:57:40 +1300 Subject: [PATCH 2/2] feat: add postgres changes tests to deno tests --- test/integration/deno/tests.ts | 288 ++++++++++++++++++++++++++++----- 1 file changed, 246 insertions(+), 42 deletions(-) diff --git a/test/integration/deno/tests.ts b/test/integration/deno/tests.ts index 036255f17..4d6b59da7 100644 --- a/test/integration/deno/tests.ts +++ b/test/integration/deno/tests.ts @@ -3,6 +3,7 @@ import { sleep } from "https://deno.land/x/sleep/mod.ts"; import { describe, it } from "jsr:@std/testing/bdd"; import { assertEquals } from "jsr:@std/assert"; import { deadline } from "jsr:@std/async/deadline"; +import { Client } from "jsr:@db/postgres@0.19"; const withDeadline = Promise>(fn: Fn, ms: number): Fn => ((...args) => deadline(fn(...args), ms)) as Fn; @@ -17,6 +18,40 @@ const realtimeServiceRole = { vsn: '2.0.0', logger: console.log, params: { apike let clientV1: RealtimeClient | null; let clientV2: RealtimeClient | null; +let dbClient: Client | null; + +async function getDbClient() { + const client = new Client({ + user: "postgres", + password: "postgres", + database: "postgres", + hostname: "localhost", + port: 5532, + }); + await client.connect(); + return client; +} + +async function cleanupTestData(client: Client, ids: number[]) { + if (ids.length > 0) { + await client.queryArray( + `DELETE FROM public.test_tenant WHERE id = ANY($1)`, + [ids] + ); + } +} + +dbClient = await getDbClient(); + +await dbClient.queryArray( + `drop publication if exists supabase_realtime; + drop table if exists public.test_tenant; + create table public.test_tenant ( id SERIAL PRIMARY KEY, details text ); + grant all on table public.test_tenant to anon; + grant all on table public.test_tenant to postgres; + grant all on table public.test_tenant to authenticated; + create publication supabase_realtime for table public.test_tenant;`, +); describe("broadcast extension", { sanitizeOps: false, sanitizeResources: false }, () => { it("users with different versions can receive self broadcast", withDeadline(async () => { @@ -154,48 +189,217 @@ describe("broadcast extension", { sanitizeOps: false, sanitizeResources: false } }, 5000)); }); -// describe("presence extension", () => { -// it("user is able to receive presence updates", async () => { -// let result: any = []; -// let error = null; -// let topic = "topic:" + crypto.randomUUID(); -// let keyV1 = "key V1"; -// let keyV2 = "key V2"; -// -// const configV1 = { config: { presence: { keyV1 } } }; -// const configV2 = { config: { presence: { keyV1 } } }; -// -// const channelV1 = clientV1 -// .channel(topic, configV1) -// .on("presence", { event: "join" }, ({ key, newPresences }) => -// result.push({ key, newPresences }) -// ) -// .subscribe(); -// -// const channelV2 = clientV2 -// .channel(topic, configV2) -// .on("presence", { event: "join" }, ({ key, newPresences }) => -// result.push({ key, newPresences }) -// ) -// .subscribe(); -// -// while (channelV1.state != "joined" || channelV2.state != "joined") await sleep(0.2); -// -// const resV1 = await channelV1.track({ key: keyV1 }); -// const resV2 = await channelV2.track({ key: keyV2 }); -// -// if (resV1 == "timed out" || resV2 == "timed out") error = resV1 || resV2; -// -// sleep(2.2); -// -// // FIXME write assertions -// console.log(result) -// let presences = result[0].newPresences[0]; -// assertEquals(result[0].key, keyV1); -// assertEquals(presences.message, message); -// assertEquals(error, null); -// }); -// }); +describe("postgres_changes extension", { sanitizeOps: false, sanitizeResources: false }, () => { + it("users with different versions can receive INSERT events", withDeadline(async () => { + clientV1 = new RealtimeClient(url, realtimeV1); + clientV2 = new RealtimeClient(url, realtimeV2); + + let resultV1 = null; + let resultV2 = null; + const testDetails = `test-insert-${crypto.randomUUID()}`; + const createdIds: number[] = []; + + const channelV1 = clientV1 + .channel("test-channel-v1") + .on( + "postgres_changes", + { event: "INSERT", schema: "public", table: "test_tenant" }, + (payload) => { + if (payload.new.details === testDetails) { + resultV1 = payload; + } + } + ) + .subscribe(); + + const channelV2 = clientV2 + .channel("test-channel-v2") + .on( + "postgres_changes", + { event: "INSERT", schema: "public", table: "test_tenant" }, + (payload) => { + if (payload.new.details === testDetails) { + resultV2 = payload; + } + } + ) + .subscribe(); + + while (channelV1.state !== "joined" || channelV2.state !== "joined") { + await sleep(0.2); + } + + // Perform INSERT + const result = await dbClient.queryObject<{ id: number }>( + `INSERT INTO public.test_tenant (details) VALUES ($1) RETURNING id`, + [testDetails] + ); + createdIds.push(result.rows[0].id); + + while (resultV1 == null || resultV2 == null) { + await sleep(0.2); + } + + assertEquals(resultV1.new.details, testDetails); + assertEquals(resultV2.new.details, testDetails); + assertEquals(resultV1.eventType, "INSERT"); + assertEquals(resultV2.eventType, "INSERT"); + + await channelV1.unsubscribe(); + await channelV2.unsubscribe(); + + await cleanupTestData(dbClient, createdIds); + await stopClient(clientV1); + await stopClient(clientV2); + + clientV1 = null; + clientV2 = null; + }, 10000)); + + it("users with different versions can receive UPDATE events", withDeadline(async () => { + clientV1 = new RealtimeClient(url, realtimeV1); + clientV2 = new RealtimeClient(url, realtimeV2); + + let resultV1 = null; + let resultV2 = null; + const initialDetails = `test-initial-${crypto.randomUUID()}`; + const updatedDetails = `test-updated-${crypto.randomUUID()}`; + const createdIds: number[] = []; + + // Create initial record + const insertResult = await dbClient.queryObject<{ id: number }>( + `INSERT INTO public.test_tenant (details) VALUES ($1) RETURNING id`, + [initialDetails] + ); + const recordId = insertResult.rows[0].id; + createdIds.push(recordId); + + const channelV1 = clientV1 + .channel("test-channel-v1") + .on( + "postgres_changes", + { event: "UPDATE", schema: "public", table: "test_tenant" }, + (payload) => { + if (payload.new.id === recordId) { + resultV1 = payload; + } + } + ) + .subscribe(); + + const channelV2 = clientV2 + .channel("test-channel-v2") + .on( + "postgres_changes", + { event: "UPDATE", schema: "public", table: "test_tenant" }, + (payload) => { + if (payload.new.id === recordId) { + resultV2 = payload; + } + } + ) + .subscribe(); + + while (channelV1.state !== "joined" || channelV2.state !== "joined") { + await sleep(0.2); + } + + // Perform UPDATE + await dbClient.queryArray( + `UPDATE public.test_tenant SET details = $1 WHERE id = $2`, + [updatedDetails, recordId] + ); + + while (resultV1 == null || resultV2 == null) { + await sleep(0.2); + } + + assertEquals(resultV1.new.details, updatedDetails); + assertEquals(resultV2.new.details, updatedDetails); + assertEquals(resultV1.eventType, "UPDATE"); + assertEquals(resultV2.eventType, "UPDATE"); + + await channelV1.unsubscribe(); + await channelV2.unsubscribe(); + + await cleanupTestData(dbClient, createdIds); + await stopClient(clientV1); + await stopClient(clientV2); + + clientV1 = null; + clientV2 = null; + }, 10000)); + + it("users with different versions can receive DELETE events", withDeadline(async () => { + clientV1 = new RealtimeClient(url, realtimeV1); + clientV2 = new RealtimeClient(url, realtimeV2); + + let resultV1 = null; + let resultV2 = null; + const testDetails = `test-delete-${crypto.randomUUID()}`; + + // Create record to delete + const insertResult = await dbClient.queryObject<{ id: number }>( + `INSERT INTO public.test_tenant (details) VALUES ($1) RETURNING id`, + [testDetails] + ); + const recordId = insertResult.rows[0].id; + + const channelV1 = clientV1 + .channel("test-channel-v1") + .on( + "postgres_changes", + { event: "DELETE", schema: "public", table: "test_tenant" }, + (payload) => { + if (payload.old.id === recordId) { + resultV1 = payload; + } + } + ) + .subscribe(); + + const channelV2 = clientV2 + .channel("test-channel-v2") + .on( + "postgres_changes", + { event: "DELETE", schema: "public", table: "test_tenant" }, + (payload) => { + if (payload.old.id === recordId) { + resultV2 = payload; + } + } + ) + .subscribe(); + + while (channelV1.state !== "joined" || channelV2.state !== "joined") { + await sleep(0.2); + } + + // Perform DELETE + await dbClient.queryArray( + `DELETE FROM public.test_tenant WHERE id = $1`, + [recordId] + ); + + while (resultV1 == null || resultV2 == null) { + await sleep(0.2); + } + + assertEquals(resultV1.old.id, recordId); + assertEquals(resultV2.old.id, recordId); + assertEquals(resultV1.eventType, "DELETE"); + assertEquals(resultV2.eventType, "DELETE"); + + await channelV1.unsubscribe(); + await channelV2.unsubscribe(); + + await stopClient(clientV1); + await stopClient(clientV2); + + clientV1 = null; + clientV2 = null; + }, 10000)); +}); async function stopClient(client: RealtimeClient | null) { if (client) {