From 7dc71c6a3eec00f548f7f1e6801fd68ebd95d515 Mon Sep 17 00:00:00 2001 From: r4id4h Date: Sun, 10 Aug 2025 13:11:45 +0200 Subject: [PATCH] Add synchronous event listener --- lib/http-proxy/passes/ws-incoming.js | 97 +++++++++++++++++++++------- 1 file changed, 75 insertions(+), 22 deletions(-) diff --git a/lib/http-proxy/passes/ws-incoming.js b/lib/http-proxy/passes/ws-incoming.js index 270f23f45..ee3d0ad7f 100644 --- a/lib/http-proxy/passes/ws-incoming.js +++ b/lib/http-proxy/passes/ws-incoming.js @@ -2,6 +2,50 @@ var http = require('http'), https = require('https'), common = require('../common'); +function isPromise(x) { return !!x && typeof x.then === 'function'; } + +async function runAsyncListenersSequentially(emitter, event, args, timeoutMs) { + const listeners = emitter ? emitter.listeners(event) : []; + for (const fn of listeners) { + if (fn.length > args.length) { + await new Promise((resolve, reject) => { + let settled = false; + let timer = null; + const done = (err) => { + if (timer) clearTimeout(timer); + if (!settled) { + settled = true; + err ? reject(err) : resolve(); + } + }; + try { + if (timeoutMs) { + timer = setTimeout(() => done(new Error(event + ' hook timeout')), timeoutMs); + } + const ret = fn.call(emitter, ...args, done); + if (isPromise(ret)) { + ret.then(() => done(), done); + } + } catch (e) { + done(e); + } + }); + } else { + const ret = fn.call(emitter, ...args); + if (isPromise(ret)) { + if (timeoutMs) { + await Promise.race([ + ret, + new Promise((_, rej) => setTimeout(() => rej(new Error(event + ' hook timeout')), timeoutMs)) + ]); + } else { + await ret; + } + } + } + } +} + /*! * Array of passes. * @@ -15,18 +59,16 @@ var http = require('http'), * */ - module.exports = { /** * WebSocket requests must have the `GET` method and * the `upgrade:websocket` header * * @param {ClientRequest} Req Request object - * @param {Socket} Websocket + * @param {Socket} Websocket * * @api private */ - checkMethodAndHeader : function checkMethodAndHeader(req, socket) { if (req.method !== 'GET' || !req.headers.upgrade) { socket.destroy(); @@ -43,12 +85,11 @@ module.exports = { * Sets `x-forwarded-*` headers if specified in config. * * @param {ClientRequest} Req Request object - * @param {Socket} Websocket + * @param {Socket} Websocket * @param {Object} Options Config object passed to the proxy * * @api private */ - XHeaders : function XHeaders(req, socket, options) { if(!options.xfwd) return; @@ -60,9 +101,9 @@ module.exports = { ['for', 'port', 'proto'].forEach(function(header) { req.headers['x-forwarded-' + header] = - (req.headers['x-forwarded-' + header] || '') + - (req.headers['x-forwarded-' + header] ? ',' : '') + - values[header]; + (req.headers['x-forwarded-' + header] || '') + + (req.headers['x-forwarded-' + header] ? ',' : '') + + values[header]; }); }, @@ -71,7 +112,7 @@ module.exports = { * send the Switching Protocols request and pipe the sockets. * * @param {ClientRequest} Req Request object - * @param {Socket} Websocket + * @param {Socket} Websocket * @param {Object} Options Config object passed to the proxy * * @api private @@ -92,21 +133,17 @@ module.exports = { } return head; }, [line]) - .join('\r\n') + '\r\n\r\n'; - } + .join('\r\n') + '\r\n\r\n'; + }; common.setupSocket(socket); if (head && head.length) socket.unshift(head); - var proxyReq = (common.isSSL.test(options.target.protocol) ? https : http).request( - common.setupOutgoing(options.ssl || {}, options, req) + common.setupOutgoing(options.ssl || {}, options, req) ); - // Enable developers to modify the proxyReq before headers are sent - if (server) { server.emit('proxyReqWs', proxyReq, req, socket, options, head); } - // Error Handler proxyReq.on('error', onOutgoingError); proxyReq.on('response', function (res) { @@ -122,7 +159,7 @@ module.exports = { // Allow us to listen when the websocket has completed proxySocket.on('end', function () { - server.emit('close', proxyRes, proxySocket, proxyHead); + server && server.emit('close', proxyRes, proxySocket, proxyHead); }); // The pipe below will end proxySocket if socket closes cleanly, but not @@ -144,19 +181,35 @@ module.exports = { proxySocket.pipe(socket).pipe(proxySocket); - server.emit('open', proxySocket); - server.emit('proxySocket', proxySocket); //DEPRECATED. + server && server.emit('open', proxySocket); + server && server.emit('proxySocket', proxySocket); // DEPRECATED. }); - return proxyReq.end(); // XXX: CHECK IF THIS IS THIS CORRECT + const hookTimeout = options && options.proxyReqWsTimeout; // optional in ms + + (async () => { + try { + if (server && typeof server.listenerCount === 'function' && server.listenerCount('proxyReqWs') > 0) { + await runAsyncListenersSequentially( + server, + 'proxyReqWs', + [proxyReq, req, socket, options, head], + hookTimeout + ); + } + proxyReq.end(); // nach Abschluss aller Hooks + } catch (err) { + onOutgoingError(err); + } + })(); function onOutgoingError(err) { if (clb) { clb(err, req, socket); - } else { + } else if (server && typeof server.emit === 'function') { server.emit('error', err, req, socket); } - socket.end(); + try { socket.end(); } catch (_) {} } } };