@@ -198,4 +198,92 @@ describe('Sockets', () => {
198198 await Promise . all ( promises ) ;
199199 await vi . waitFor ( ( ) => expect ( serverCancelSpy . mock . calls . length ) . equals ( testCount ) , { timeout : 2000 } ) ;
200200 } ) ;
201+
202+ /**
203+ * Similar to the above test, but checks for the case where
204+ * the server closes the connection due to a keepalive timeout.
205+ */
206+ it ( 'should handle closed server connections correctly' , async ( ) => {
207+ const transport = new WebsocketServerTransport ( {
208+ wsCreator : ( ) => server
209+ } ) ;
210+
211+ const onCancelWrapper = ( callback : ( ) => void ) => callback ( ) ;
212+ const serverCancelSpy = vi . fn ( onCancelWrapper ) ;
213+
214+ // Create a simple server which will spam a lot of data to any connection
215+ const rSocketServer = new RSocketServer ( {
216+ transport,
217+ acceptor : {
218+ accept : async ( ) => {
219+ return {
220+ requestStream : ( payload , initialN , responder ) => {
221+ let stop = false ;
222+
223+ setImmediate ( async ( ) => {
224+ while ( ! stop ) {
225+ // To trigger the issue, we need to send multiple individual large messages.
226+ // This builds up a buffer that will be sent after closing the connection.
227+ for ( let i = 0 ; i < 5 ; i ++ ) {
228+ responder . onNext ( { data : Buffer . from ( 'some payload' . repeat ( 100_000 ) ) } , false ) ;
229+ }
230+ await new Promise ( ( r ) => setTimeout ( r , 1 ) ) ;
231+ }
232+ } ) ;
233+ return {
234+ request : ( ) => { } ,
235+ onExtension : ( ) => { } ,
236+ cancel : ( ) => {
237+ serverCancelSpy ( ( ) => {
238+ stop = true ;
239+ } ) ;
240+ }
241+ } ;
242+ }
243+ } ;
244+ }
245+ }
246+ } ) ;
247+ rSocketServer . bind ( ) ;
248+
249+ // Try and connect 10 times. Without the fix,
250+ // more than 50% of these should fail.
251+ // The socket will be closed by the server
252+ const testCount = 10 ;
253+ const promises = new Array ( testCount ) . fill ( null ) . map ( async ( ) => {
254+ const testSocket = new WebSocket . WebSocket ( WS_ADDRESS ) ;
255+
256+ const connector = new RSocketConnector ( {
257+ transport : new WebsocketClientTransport ( {
258+ url : WS_ADDRESS ,
259+ wsCreator : ( url ) => testSocket as any
260+ } ) ,
261+
262+ setup : {
263+ dataMimeType : 'application/bson' ,
264+ metadataMimeType : 'application/bson' ,
265+
266+ keepAlive : 20000 ,
267+ // This should be long enough to trigger after the initial setup
268+ lifetime : 20 ,
269+
270+ payload : {
271+ data : null
272+ }
273+ }
274+ } ) ;
275+
276+ const connection = await connector . connect ( ) ;
277+
278+ connection . requestStream ( { data : null } , 1 , {
279+ onNext ( ) { } ,
280+ onComplete : ( ) => { } ,
281+ onExtension : ( ) => { } ,
282+ onError : ( ) => { }
283+ } ) ;
284+ } ) ;
285+
286+ await Promise . all ( promises ) ;
287+ await vi . waitFor ( ( ) => expect ( serverCancelSpy . mock . calls . length ) . equals ( testCount ) , { timeout : 2000 } ) ;
288+ } ) ;
201289} ) ;
0 commit comments