-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathNodeServer.js
More file actions
401 lines (333 loc) · 10.3 KB
/
NodeServer.js
File metadata and controls
401 lines (333 loc) · 10.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
const os = require( 'os' );
const fs = require( 'fs' );
const WebSocketModule = require('ws');
const Pop = require('./PopApi');
const ExpressModule = require('express');
function NumberOrNull(Value,Name)
{
if ( Value == 'null' )
return null;
const Num = Number(Value);
if ( isNaN(Num) )
throw `${Value}(${Name}) needs to be a number`;
return Value;
}
function IntegerOrNull(Value,Name)
{
const Num = NumberOrNull(Value);
if ( Num === null )
return null;
if ( !Number.isInteger(Num) )
throw `${Value}(${Name}) needs to be integer`;
return Num;
}
// gr: can I use assign()?
const CorsOrigin = process.env.CorsOrigin || '*';
const ErrorStatusCode = IntegerOrNull( process.env.ErrorStatusCode || 500, 'ErrorStatusCode' );
const StaticFilesPath = process.env.StaticFilesPath || './';
const PullPort = IntegerOrNull( process.env.PullPort || 80, 'PullPort' );
const PushPort = IntegerOrNull( process.env.PushPort || 80, 'PushPort' );
const ArtifactFileTries = IntegerOrNull( process.env.ArtifactFileTries || 20, 'ArtifactFileTries' );
const ArtifactPath = process.env.ArtifactPath || './Artifacts';
const ArtifactSizeLimit = NumberOrNull(process.env.ArtifactSizeLimit || (200*1024*1024), 'ArtifactSizeLimit' );
try
{
const AllEnv = JSON.stringify(process.env,null,'\t');
console.log(`env (all) ${AllEnv}`);
}
catch(e)
{
console.log(`env (all) error -> ${e}`);
}
const RecordStreamPacketDelin = 'Pop\n'; // gr: i insert this before every packet when writing files, so we need it here too.
const ArtifactUrlPattern = new RegExp('\/([A-Za-z]){4}$')
const ArtifactAlphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
const ArtifactListUrl = '/ls';
// gr: for sloppy, we can only expose one port (I think), so we share the http server
const SharingHttpServer = ( PullPort == PushPort );
// artifact server
const HttpServerApp = ExpressModule();
HttpServerApp.get(ArtifactUrlPattern,HandleGetArtifact);
HttpServerApp.get(ArtifactListUrl,HandleGetFileList);
HttpServerApp.get('/', function (req, res) { res.redirect('/index.html') });
HttpServerApp.use('/', ExpressModule.static(StaticFilesPath));
const HttpServer = HttpServerApp.listen( PullPort, () => console.log( `Pull http server on ${JSON.stringify(HttpServer.address())}` ) );
// artifact recorder
const WebSocketOptions = SharingHttpServer ? { noServer:SharingHttpServer } : { port: PushPort };
const WebSocketServer = new WebSocketModule.Server(WebSocketOptions);
WebSocketServer.on('connection',StartWebsocketClientThread);
if ( SharingHttpServer )
{
HttpServer.on('upgrade', (request, socket, head) =>
{
WebSocketServer.handleUpgrade(request, socket, head, socket =>
{
WebSocketServer.emit('connection', socket, request);
}
);
});
// get an exception if we use this address() in no-server mode
WebSocketServer.address = HttpServer.address;
}
console.log(`Push websocket server on ${JSON.stringify(WebSocketServer.address())}`);
function GetNewArtifactName()
{
// random X chars in alphabet
const a = Pop.RandomInt(0,ArtifactAlphabet.length);
const b = Pop.RandomInt(0,ArtifactAlphabet.length);
const c = Pop.RandomInt(0,ArtifactAlphabet.length);
const d = Pop.RandomInt(0,ArtifactAlphabet.length);
const abcd = [a,b,c,d].map( i => ArtifactAlphabet[i] );
return abcd.join('');
}
function GetArtifactFilename(ArtifactName)
{
return `${ArtifactPath}/${ArtifactName}.PopCap`;
}
class TArtifact
{
constructor(Name)
{
this.Name = Name;
this.Filename = GetArtifactFilename(this.Name);
this.Error = null;
this.Stream = null;
this.BytesWritten = 0;
const Options = {};
Options.flags = 'wx'; // x = fail if exists
this.Stream = fs.createWriteStream(this.Filename,Options);
this.Stream.on('error', this.OnError.bind(this) );
}
OnError(Error)
{
console.log(`Artifact stream error ${Error}`);
this.Error = Error || 'Unspecified stream error';
}
WritePacket(Data)
{
// file has errored!
if ( this.Error !== null )
{
console.log(`Error failed as file has failed ${this.Error}`);
throw this.Error;
}
this.Stream.write(RecordStreamPacketDelin);
this.Stream.write(Data);
//console.log(`Wrote x${Data.length}`);
this.BytesWritten += RecordStreamPacketDelin.length;
this.BytesWritten += Data.length;
}
GetSize()
{
return this.BytesWritten;
}
}
async function OpenNewArtifactStream()
{
let Error = null;
function OnError(Error)
{
// https://stackoverflow.com/questions/20864036/error-handling-on-createwritestream
//file.read();
console.log(`Open file error ${Error}`);
}
// how many tries do we allow?
// how do we do this better... we're also relying on the file system to be atomic
for ( let i=0; i<ArtifactFileTries; i++ )
{
const Name = GetNewArtifactName();
try
{
const Artifact = new TArtifact(Name);
return Artifact;
}
catch(e)
{
console.log(`Failed to open stream for artifact name ${Name}; ${e}`);
}
}
throw `Failed to open new artifact name after ${ArtifactFileTries} tries`;
}
function StartWebsocketClientThread(Client,Request)
{
console.log(`StartWebsocketClientThread(${Client},${Request})`);
const ClientAddress = Request.socket.remoteAddress;
function Disconnect(Error)
{
console.log(`Disconnect(${Error})`);
// if error, send hail mary message before disconnecting
if ( Error )
{
console.log(`Disconnect, sending hail mary error message ${Error}`);
const Message = {};
Message.Error = Error;
Client.send(JSON.stringify(Message));
}
Client.terminate();
}
Client.on('error',Disconnect);
WebsocketClientThread(Client).then(Disconnect).catch(Disconnect);
}
async function WebsocketClientThread(Client)
{
let Connected = true;
let RecvMessageQueue = new Pop.PromiseQueue('RecvMessageQueue');
function OnMessage(Message)
{
RecvMessageQueue.Push(Message);
}
function OnClosed()
{
Connected = false;
}
async function WaitForMessage()
{
if ( !Connected )
throw `Disconnected`;
throw `todo WaitForMessage`;
}
async function SendAndWaitForReply(Message)
{
if ( typeof Message == 'object' )
Message = JSON.stringify(Message);
Client.send(Message);
const Reply = await RecvMessageQueue.WaitForNext();
// check reply has a specific reply info
return Reply;
}
Client.on('message',OnMessage);
Client.on('close',OnClosed);
// send "disclaimer"
// wait for an auth message
// open file with random handle
const Artifact = await OpenNewArtifactStream();
{
// add meta here
const StreamInitMessage = {};
StreamInitMessage.CmsInit = true;
Artifact.WritePacket(JSON.stringify(StreamInitMessage));
}
// send back Artifact Name
{
const ArtifactMessage = {};
ArtifactMessage.Command = 'Hello';
ArtifactMessage.ArtifactName = Artifact.Name;
const ArtifactAck = await SendAndWaitForReply(ArtifactMessage);
// if ArtifactAck != something error
}
async function RecvLoop()
{
// write everything we recieve into the file
while(Connected)
{
const Message = await RecvMessageQueue.WaitForNext();
Artifact.WritePacket(Message);
// stop thread once we exceed size
if ( ArtifactSizeLimit !== null )
{
const ArtifactSize = Artifact.GetSize();
if ( ArtifactSize >= ArtifactSizeLimit )
throw `Stream has exceeded size limit (${ArtifactSize} >= ${ArtifactSizeLimit})`;
}
}
}
function GetPingMessage()
{
const Message = {};
Message.Command = 'Ping';
return Message;
}
async function PingLoop()
{
// occasionally send status back, this also will let us know if socket disconnects
while(Connected)
{
await Pop.Yield(10*1000);
const Message = GetPingMessage();
Client.send( JSON.stringify(Message) );
}
}
// now absorb messages and write to file
const ReadLoopFinished = RecvLoop();
// todo: check for size limits & stop (in ping loop?)
const PingLoopFinished = PingLoop();
// wait for a thread to exit (probbaly disconnected)
await Promise.race([ReadLoopFinished,PingLoopFinished]);
return 'WebsocketClientThread finished';
}
async function GetArtifactPipe(Request)
{
const ArtifactName = Request.path.slice(1).toUpperCase(); // strip / and we allow lower case letters, but artifacts are uppercase
console.log(`Requst artifact ${ArtifactName}`);
const ArtifactFilename = GetArtifactFilename(ArtifactName);
// gr: this doesn't 404 here...
const Stream = fs.createReadStream(ArtifactFilename);
return Stream;
}
async function HandleGetArtifact(Request,Response)
{
try
{
const ArtifactStream = await GetArtifactPipe(Request);
const Output = {};
Output.Mime = 'application/octet-stream';
const StreamFinished = Pop.CreatePromise();
ArtifactStream.on('end', StreamFinished.Resolve );
ArtifactStream.on('error', StreamFinished.Reject );
// PopImageServer generic code
//const Output = await RunApp(Request);
Output.StatusCode = Output.StatusCode || 200;
Output.Mime = Output.Mime || 'text/plain';
Response.statusCode = Output.StatusCode;
Response.setHeader('Content-Type',Output.Mime);
Response.setHeader('Access-Control-Allow-Origin',CorsOrigin); // allow CORS
ArtifactStream.pipe(Response);
// should this wait until end event?
// we kinda need a way to stop if there was an error and not pipe until then?
console.log(`Wait for stream finished`);
await StreamFinished;
Response.end();
//Response.end(Output.Output);
}
catch (e)
{
console.log(`RunApp error -> ${e}`);
Response.statusCode = ErrorStatusCode;
Response.setHeader('Content-Type','text/plain');
Response.end(`Error ${e}`);
}
}
async function HandleGetFileList(Request,Response)
{
try
{
// get list of files as a promise
const FileListPromise = Pop.CreatePromise();
function OnReadDir(Error,Files)
{
if ( Error )
FileListPromise.Reject(Error);
else
FileListPromise.Resolve(Files);
}
const Options = { withFileTypes: true };
fs.readdir(ArtifactPath, Options, OnReadDir );
const Files = await FileListPromise;
// gr: now filter artifact names, get some meta etc
const Output = {};
Output.Output = JSON.stringify(Files,null,'\t');
// PopImageServer generic code
Output.StatusCode = Output.StatusCode || 200;
Output.Mime = Output.Mime || 'text/plain';
Response.statusCode = Output.StatusCode;
Response.setHeader('Content-Type',Output.Mime);
Response.end(Output.Output);
}
catch (e)
{
console.log(`HandleGetList error -> ${e}`);
Response.statusCode = ErrorStatusCode;
Response.setHeader('Content-Type','text/plain');
Response.end(`Error ${e}`);
}
}