@@ -7,15 +7,21 @@ import { WebsocketServerTransport } from '../../src/router/transport/WebSocketSe
77import { WebsocketDuplexConnection } from '../../src/router/transport/WebsocketDuplexConnection.js' ;
88import { Duplex } from 'stream' ;
99
10- const WS_PORT = process . env . WS_PORT ? parseInt ( process . env . WS_PORT ) : 4532 ;
11- const WS_ADDRESS = `ws://localhost:${ WS_PORT } ` ;
10+ let nextPort = 5433 ;
1211
1312describe ( 'Sockets' , ( ) => {
1413 let server : WebSocket . WebSocketServer ;
1514 let closeServer : ( ) => void ;
1615
16+ let WS_PORT = 0 ;
17+ let WS_ADDRESS = '' ;
18+
1719 beforeEach ( ( ) => {
1820 let closed = false ;
21+
22+ WS_PORT = process . env . WS_PORT ? parseInt ( process . env . WS_PORT ) : nextPort ++ ;
23+ WS_ADDRESS = `ws://localhost:${ WS_PORT } ` ;
24+
1925 server = new WebSocket . WebSocketServer ( {
2026 port : WS_PORT
2127 } ) ;
@@ -198,4 +204,92 @@ describe('Sockets', () => {
198204 await Promise . all ( promises ) ;
199205 await vi . waitFor ( ( ) => expect ( serverCancelSpy . mock . calls . length ) . equals ( testCount ) , { timeout : 2000 } ) ;
200206 } ) ;
207+
208+ /**
209+ * Similar to the above test, but checks for the case where
210+ * the server closes the connection due to a keepalive timeout.
211+ */
212+ it ( 'should handle closed server connections correctly' , async ( ) => {
213+ const transport = new WebsocketServerTransport ( {
214+ wsCreator : ( ) => server
215+ } ) ;
216+
217+ const onCancelWrapper = ( callback : ( ) => void ) => callback ( ) ;
218+ const serverCancelSpy = vi . fn ( onCancelWrapper ) ;
219+
220+ // Create a simple server which will spam a lot of data to any connection
221+ const rSocketServer = new RSocketServer ( {
222+ transport,
223+ acceptor : {
224+ accept : async ( ) => {
225+ return {
226+ requestStream : ( payload , initialN , responder ) => {
227+ let stop = false ;
228+
229+ setImmediate ( async ( ) => {
230+ while ( ! stop ) {
231+ // To trigger the issue, we need to send multiple individual large messages.
232+ // This builds up a buffer that will be sent after closing the connection.
233+ for ( let i = 0 ; i < 5 ; i ++ ) {
234+ responder . onNext ( { data : Buffer . from ( 'some payload' . repeat ( 100_000 ) ) } , false ) ;
235+ }
236+ await new Promise ( ( r ) => setTimeout ( r , 1 ) ) ;
237+ }
238+ } ) ;
239+ return {
240+ request : ( ) => { } ,
241+ onExtension : ( ) => { } ,
242+ cancel : ( ) => {
243+ serverCancelSpy ( ( ) => {
244+ stop = true ;
245+ } ) ;
246+ }
247+ } ;
248+ }
249+ } ;
250+ }
251+ }
252+ } ) ;
253+ rSocketServer . bind ( ) ;
254+
255+ // Try and connect 10 times. Without the fix,
256+ // more than 50% of these should fail.
257+ // The socket will be closed by the server
258+ const testCount = 10 ;
259+ const promises = new Array ( testCount ) . fill ( null ) . map ( async ( ) => {
260+ const testSocket = new WebSocket . WebSocket ( WS_ADDRESS ) ;
261+
262+ const connector = new RSocketConnector ( {
263+ transport : new WebsocketClientTransport ( {
264+ url : WS_ADDRESS ,
265+ wsCreator : ( url ) => testSocket as any
266+ } ) ,
267+
268+ setup : {
269+ dataMimeType : 'application/bson' ,
270+ metadataMimeType : 'application/bson' ,
271+
272+ keepAlive : 20000 ,
273+ // This should be long enough to trigger after the initial setup
274+ lifetime : 20 ,
275+
276+ payload : {
277+ data : null
278+ }
279+ }
280+ } ) ;
281+
282+ const connection = await connector . connect ( ) ;
283+
284+ connection . requestStream ( { data : null } , 1 , {
285+ onNext ( ) { } ,
286+ onComplete : ( ) => { } ,
287+ onExtension : ( ) => { } ,
288+ onError : ( ) => { }
289+ } ) ;
290+ } ) ;
291+
292+ await Promise . all ( promises ) ;
293+ await vi . waitFor ( ( ) => expect ( serverCancelSpy . mock . calls . length ) . equals ( testCount ) , { timeout : 2000 } ) ;
294+ } ) ;
201295} ) ;
0 commit comments