forked from CassiopeiaCode/q2api
-
Notifications
You must be signed in to change notification settings - Fork 25
Expand file tree
/
Copy pathevent_stream_parser.ts
More file actions
125 lines (103 loc) · 4.15 KB
/
event_stream_parser.ts
File metadata and controls
125 lines (103 loc) · 4.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
export class EventStreamParser {
static parseHeaders(headersData: Uint8Array): Record<string, any> {
const headers: Record<string, any> = {};
let offset = 0;
const decoder = new TextDecoder("utf-8");
while (offset < headersData.byteLength) {
if (offset >= headersData.byteLength) break;
const nameLength = headersData[offset];
offset += 1;
if (offset + nameLength > headersData.byteLength) break;
const name = decoder.decode(headersData.subarray(offset, offset + nameLength));
offset += nameLength;
if (offset >= headersData.byteLength) break;
const valueType = headersData[offset];
offset += 1;
if (offset + 2 > headersData.byteLength) break;
const view = new DataView(headersData.buffer, headersData.byteOffset + offset, headersData.byteLength - offset);
const valueLength = view.getUint16(0, false);
offset += 2;
if (offset + valueLength > headersData.byteLength) break;
if (valueType === 7) {
const value = decoder.decode(headersData.subarray(offset, offset + valueLength));
headers[name] = value;
} else {
const value = headersData.subarray(offset, offset + valueLength);
headers[name] = value;
}
offset += valueLength;
}
return headers;
}
static parseMessage(data: Uint8Array): { headers: Record<string, any>, payload: any, total_length: number } | null {
if (data.byteLength < 16) return null;
const view = new DataView(data.buffer, data.byteOffset, data.byteLength);
const totalLength = view.getUint32(0, false);
const headersLength = view.getUint32(4, false);
if (data.byteLength < totalLength) {
// Incomplete message
return null;
}
const headersData = data.subarray(12, 12 + headersLength);
const headers = EventStreamParser.parseHeaders(headersData);
const payloadStart = 12 + headersLength;
const payloadEnd = totalLength - 4; // Skip Message CRC (last 4 bytes)
const payloadData = data.subarray(payloadStart, payloadEnd);
let payload = null;
if (payloadData.length > 0) {
try {
const text = new TextDecoder("utf-8").decode(payloadData);
payload = JSON.parse(text);
} catch {
payload = payloadData;
}
}
return {
headers,
payload,
total_length: totalLength
};
}
static async *parseStream(stream: ReadableStream<Uint8Array>): AsyncGenerator<any> {
const reader = stream.getReader();
let buffer = new Uint8Array(0);
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
if (value) {
const newBuffer = new Uint8Array(buffer.length + value.length);
newBuffer.set(buffer);
newBuffer.set(value, buffer.length);
buffer = newBuffer;
}
while (buffer.length >= 12) {
const view = new DataView(buffer.buffer, buffer.byteOffset, buffer.byteLength);
const totalLength = view.getUint32(0, false);
if (buffer.length < totalLength) break;
const messageData = buffer.subarray(0, totalLength);
buffer = buffer.subarray(totalLength);
const message = EventStreamParser.parseMessage(messageData);
if (message) {
yield message;
}
}
}
} finally {
reader.releaseLock();
}
}
}
export function extractEventInfo(message: any): any {
const headers = message.headers || {};
const payload = message.payload;
const eventType = headers[':event-type'] || headers['event-type'];
const contentType = headers[':content-type'] || headers['content-type'];
const messageType = headers[':message-type'] || headers['message-type'];
return {
event_type: eventType,
content_type: contentType,
message_type: messageType,
payload: payload
};
}