Skip to content

Commit b21bea7

Browse files
committed
[ECO-5260] feat: initial local proxy implementation
1 parent 7b754a1 commit b21bea7

File tree

7 files changed

+489
-21
lines changed

7 files changed

+489
-21
lines changed

.github/workflows/check.yml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
name: Check
2+
3+
on:
4+
pull_request:
5+
push:
6+
branches:
7+
- main
8+
9+
jobs:
10+
check:
11+
runs-on: ubuntu-latest
12+
steps:
13+
- uses: actions/checkout@v4
14+
15+
- uses: actions/setup-node@v4
16+
with:
17+
node-version: 20
18+
- run: npm ci
19+
20+
- name: Run test
21+
run: npm run test

src/async-queue.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
type AsyncTask = () => Promise<void>;
2+
3+
export class AsyncQueue {
4+
private readonly queue: AsyncTask[] = [];
5+
private processing: boolean = false;
6+
7+
enqueue(task: AsyncTask) {
8+
return new Promise<void>((resolve, reject) => {
9+
this.queue.push(async () => {
10+
try {
11+
await task();
12+
resolve();
13+
} catch (e) {
14+
reject(e);
15+
}
16+
});
17+
this.processNext();
18+
});
19+
}
20+
21+
private async processNext() {
22+
if (this.processing || this.queue.length === 0) return;
23+
24+
this.processing = true;
25+
26+
const task = this.queue.shift();
27+
28+
try {
29+
await task!!();
30+
} finally {
31+
this.processing = false;
32+
this.processNext();
33+
}
34+
}
35+
}

src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export * from './proxy.js';

src/proxy.test.ts

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import { beforeAll, describe, expect, it } from 'vitest';
2+
import WebSocket, { WebSocketServer } from 'ws';
3+
import { createInterceptingProxy, type InterceptingProxy } from './proxy.js';
4+
import http from 'http';
5+
6+
describe('messages integration', () => {
7+
let proxy: InterceptingProxy;
8+
9+
beforeAll(async () => {
10+
const server = await startEchoServer();
11+
proxy = createInterceptingProxy({
12+
realtimeHost: 'localhost',
13+
restHost: 'localhost',
14+
port: 8080,
15+
tls: false,
16+
});
17+
await proxy.start();
18+
return () => new Promise<void>((resolve) => server.close(() => resolve()));
19+
});
20+
21+
it('test', async () => {
22+
const unregister = proxy.registerRealtimeMiddleware(async (message, messageType) => {
23+
if (messageType === 'incoming') {
24+
return { command: 'replace', replacement: { action: 3 } };
25+
} else {
26+
return { command: 'keep' };
27+
}
28+
});
29+
30+
const testWs = new WebSocket(`ws://localhost:${proxy.options.port}`);
31+
32+
await openConnection(testWs);
33+
34+
await runAndObserve(
35+
async () => testWs.send(JSON.stringify({ action: 1 })),
36+
async () => {
37+
const message = await proxy.observeNextIncomingProtocolMessage();
38+
const rawWebsocketMessage = await waitForNextReceivedMessage(testWs);
39+
expect(message).toEqual({ action: 3 });
40+
expect(rawWebsocketMessage.toString()).toBe('{"action":3}');
41+
},
42+
);
43+
44+
unregister();
45+
testWs.close();
46+
});
47+
});
48+
49+
async function startEchoServer(port = 8080) {
50+
const echoServer = http.createServer((req, res) => {
51+
const bodyChunks: any[] = [];
52+
req.on('data', (chunk) => bodyChunks.push(chunk));
53+
req.on('end', () => {
54+
const requestData = Buffer.concat(bodyChunks).toString();
55+
res.writeHead(200);
56+
res.end(requestData);
57+
});
58+
});
59+
60+
const wss = new WebSocketServer({ server: echoServer, path: '/' });
61+
62+
wss.on('connection', (ws) => {
63+
ws.on('message', (message) => {
64+
ws.send(message);
65+
});
66+
});
67+
68+
await new Promise<void>((resolve) => {
69+
echoServer.listen(port, () => resolve());
70+
});
71+
72+
return echoServer;
73+
}
74+
75+
async function runAndObserve(runner: () => Promise<void>, observer: () => Promise<void>) {
76+
const observerPromise = observer();
77+
await runner();
78+
await observerPromise;
79+
}
80+
81+
async function openConnection(ws: WebSocket) {
82+
return new Promise<void>((resolve) => ws.once('open', () => resolve()));
83+
}
84+
85+
async function waitForNextReceivedMessage(ws: WebSocket) {
86+
return new Promise<WebSocket.RawData>((resolve) => {
87+
ws.once('message', (data) => {
88+
resolve(data);
89+
});
90+
});
91+
}

0 commit comments

Comments
 (0)