diff --git a/packages/discv5/src/kademlia/bucket.ts b/packages/discv5/src/kademlia/bucket.ts index 340a96df..849f4cd8 100644 --- a/packages/discv5/src/kademlia/bucket.ts +++ b/packages/discv5/src/kademlia/bucket.ts @@ -82,7 +82,7 @@ export class Bucket extends (EventEmitter as { new (): BucketEventEmitter }) { return InsertResult.NodeExists; } - const isPendingNode = this.pending?.value.nodeId === value.nodeId; + const isPendingNode = this.pending && this.pending.value.nodeId === value.nodeId; switch (status) { case EntryStatus.Connected: { @@ -141,7 +141,7 @@ export class Bucket extends (EventEmitter as { new (): BucketEventEmitter }) { } else { return UpdateResult.NotModified; } - } else if (this.pending?.value.nodeId === value.nodeId) { + } else if (this.pending && this.pending.value.nodeId === value.nodeId) { this.pending.value = value; return UpdateResult.UpdatedPending; } else { @@ -192,7 +192,7 @@ export class Bucket extends (EventEmitter as { new (): BucketEventEmitter }) { default: throw new Error("Unreachable"); } - } else if (this.pending?.value.nodeId === id) { + } else if (this.pending && this.pending.value.nodeId === id) { this.pending.status = status; return UpdateResult.UpdatedPending; } else { diff --git a/packages/discv5/src/rateLimit/index.ts b/packages/discv5/src/rateLimit/index.ts index dbc99cf7..3843e482 100644 --- a/packages/discv5/src/rateLimit/index.ts +++ b/packages/discv5/src/rateLimit/index.ts @@ -43,7 +43,9 @@ export class RateLimiter implements IRateLimiter { } if (!this.rateLimiterIP.allows(ip, 1)) { - this.metrics?.rateLimitHitIP.inc(); + if (this.metrics) { + this.metrics.rateLimitHitIP.inc(); + } this.bannedIPs.set(ip, Date.now()); @@ -51,7 +53,9 @@ export class RateLimiter implements IRateLimiter { } if (!this.rateLimiterGlobal.allows(null, 1)) { - this.metrics?.rateLimitHitTotal.inc(); + if (this.metrics) { + this.metrics.rateLimitHitTotal.inc(); + } return false; } diff --git a/packages/discv5/src/service/addrVotes.ts b/packages/discv5/src/service/addrVotes.ts index 52f84c70..b1364d7c 100644 --- a/packages/discv5/src/service/addrVotes.ts +++ b/packages/discv5/src/service/addrVotes.ts @@ -22,7 +22,7 @@ export class AddrVotes { const socketAddrStr = serializeSocketAddr(ip); const prevVote = this.votes.get(voter); - if (prevVote?.socketAddrStr === socketAddrStr) { + if (prevVote && prevVote.socketAddrStr === socketAddrStr) { // Same vote, ignore return false; } else if (prevVote !== undefined) { diff --git a/packages/discv5/src/service/service.ts b/packages/discv5/src/service/service.ts index a35062e3..ca277572 100644 --- a/packages/discv5/src/service/service.ts +++ b/packages/discv5/src/service/service.ts @@ -485,7 +485,8 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) { const enr = this.findEnr(peer); if (!enr || !getSocketAddressMultiaddrOnENR(enr, this.ipMode)) { log("Lookup %s requested an unknown ENR or ENR w/o UDP", lookupId); - this.activeLookups.get(lookupId)?.onFailure(peer); + const lookup = this.activeLookups.get(lookupId); + if (lookup) lookup.onFailure(peer); return; } @@ -512,7 +513,9 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) { log("Sending %s to node: %o", MessageType[activeRequest.request.type], nodeAddr); try { this.sessionService.sendRequest(activeRequest.contact, activeRequest.request); - this.metrics?.sentMessageCount.inc({ type: MessageType[activeRequest.request.type] }); + if (this.metrics) { + this.metrics.sentMessageCount.inc({ type: MessageType[activeRequest.request.type] }); + } } catch (e) { this.activeRequests.delete(bytesToBigint(activeRequest.request.id)); log("Error sending RPC to node: %o, :Error: %s", nodeAddr, (e as Error).message); @@ -526,7 +529,9 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) { log("Sending %s to node: %o", MessageType[response.type], nodeAddr); try { this.sessionService.sendResponse(nodeAddr, response); - this.metrics?.sentMessageCount.inc({ type: MessageType[response.type] }); + if (this.metrics) { + this.metrics.sentMessageCount.inc({ type: MessageType[response.type] }); + } } catch (e) { log("Error sending RPC to node: %o, :Error: %s", nodeAddr, (e as Error).message); } @@ -735,7 +740,9 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) { */ private handleRpcRequest = (nodeAddr: INodeAddress, request: RequestMessage): void => { const requestType = MessageType[request.type]; - this.metrics?.rcvdMessageCount.inc({ type: requestType }); + if (this.metrics) { + this.metrics.rcvdMessageCount.inc({ type: requestType }); + } try { switch (request.type) { @@ -777,7 +784,9 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) { log("Sending PONG response to node: %o", nodeAddr); try { this.sessionService.sendResponse(nodeAddr, pongMessage); - this.metrics?.sentMessageCount.inc({ type: MessageType[MessageType.PONG] }); + if (this.metrics) { + this.metrics.sentMessageCount.inc({ type: MessageType[MessageType.PONG] }); + } } catch (e) { log("Failed to send Pong. Error %s", (e as Error).message); } @@ -809,7 +818,9 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) { log("Sending empty NODES response to %o", nodeAddr); try { this.sessionService.sendResponse(nodeAddr, createNodesMessage(id, 1, nodes)); - this.metrics?.sentMessageCount.inc({ type: MessageType[MessageType.NODES] }); + if (this.metrics) { + this.metrics.sentMessageCount.inc({ type: MessageType[MessageType.NODES] }); + } } catch (e) { log("Failed to send a NODES response. Error: %s", (e as Error).message); } @@ -828,7 +839,9 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) { const _nodes = nodes.slice(i, i + nodesPerPacket); try { this.sessionService.sendResponse(nodeAddr, createNodesMessage(id, total, _nodes)); - this.metrics?.sentMessageCount.inc({ type: MessageType[MessageType.NODES] }); + if (this.metrics) { + this.metrics.sentMessageCount.inc({ type: MessageType[MessageType.NODES] }); + } } catch (e) { log("Failed to send a NODES response. Error: %s", (e as Error).message); } @@ -847,7 +860,9 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) { */ private handleRpcResponse = (nodeAddr: INodeAddress, response: ResponseMessage): void => { const responseType = MessageType[response.type]; - this.metrics?.rcvdMessageCount.inc({ type: responseType }); + if (this.metrics) { + this.metrics.rcvdMessageCount.inc({ type: responseType }); + } // verify we know of the rpc id @@ -867,21 +882,25 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) { nodeAddr, response.id ); - activeRequest.callbackPromise?.reject( - new CodeError( - "Received a response from an nexpected address", - ResponseErrorType[ResponseErrorType.WrongAddress] - ) - ); + if (activeRequest.callbackPromise) { + activeRequest.callbackPromise.reject( + new CodeError( + "Received a response from an nexpected address", + ResponseErrorType[ResponseErrorType.WrongAddress] + ) + ); + } return; } // Check that the response type matches the request if (!requestMatchesResponse(activeRequest.request, response)) { log("Node gave an incorrect response type. Ignoring response from: %o", nodeAddr); - activeRequest.callbackPromise?.reject( - new CodeError("Response has incorrect response type", ResponseErrorType[ResponseErrorType.WrongResponseType]) - ); + if (activeRequest.callbackPromise) { + activeRequest.callbackPromise.reject( + new CodeError("Response has incorrect response type", ResponseErrorType[ResponseErrorType.WrongResponseType]) + ); + } return; } @@ -901,13 +920,16 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) { // TODO Implement all RPC methods return; } - - activeRequest.callbackPromise?.resolve(toResponseType(response)); + if (activeRequest.callbackPromise) { + activeRequest.callbackPromise.resolve(toResponseType(response)); + } } catch (e) { log("Error handling rpc response: node: %o response: %s", nodeAddr, responseType); - activeRequest.callbackPromise?.reject( - new CodeError((e as Error).message, ResponseErrorType[ResponseErrorType.InternalError]) - ); + if (activeRequest.callbackPromise) { + activeRequest.callbackPromise.reject( + new CodeError((e as Error).message, ResponseErrorType[ResponseErrorType.InternalError]) + ); + } } }; @@ -1056,6 +1078,8 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) { this.connectionUpdated(nodeId, { type: ConnectionStatusType.Disconnected }); // If this is initiated by the user, return the error on the callback. - callbackPromise?.reject(new CodeError("RPC failure", RequestErrorType[error])); + if (callbackPromise) { + callbackPromise.reject(new CodeError("RPC failure", RequestErrorType[error])); + } }; } diff --git a/packages/discv5/src/session/service.ts b/packages/discv5/src/session/service.ts index 25a8ee53..a1e6fca7 100644 --- a/packages/discv5/src/session/service.ts +++ b/packages/discv5/src/session/service.ts @@ -286,7 +286,7 @@ export class SessionService extends (EventEmitter as { new (): StrictEventEmitte // table (remote_enr is None) then we re-request the ENR to keep the session up to date. // send the challenge - const enrSeq = remoteEnr?.seq ?? 0n; + const enrSeq = (remoteEnr && remoteEnr.seq) ?? 0n; const packet = createWhoAreYouPacket(nonce, enrSeq); const challengeData = encodeChallengeData(packet.maskingIv, packet.header); @@ -485,7 +485,9 @@ export class SessionService extends (EventEmitter as { new (): StrictEventEmitte const enrMultiaddrIP6 = getSocketAddressMultiaddrOnENR(enr, { ...this.ipMode, ip4: false } as IPMode); return ( enr.nodeId === nodeAddr.nodeId && - (enrMultiaddrIP4?.equals(nodeAddr.socketAddr) ?? enrMultiaddrIP6?.equals(nodeAddr.socketAddr) ?? true) + ((enrMultiaddrIP4 && enrMultiaddrIP4.equals(nodeAddr.socketAddr)) ?? + (enrMultiaddrIP6 && enrMultiaddrIP6.equals(nodeAddr.socketAddr)) ?? + true) ); } @@ -759,11 +761,15 @@ export class SessionService extends (EventEmitter as { new (): StrictEventEmitte } private removeExpectedResponse(socketAddr: Multiaddr): void { - this.transport.addExpectedResponse?.(socketAddr.toOptions().host); + if (this.transport.addExpectedResponse) { + this.transport.addExpectedResponse(socketAddr.toOptions().host); + } } private addExpectedResponse(socketAddr: Multiaddr): void { - this.transport.removeExpectedResponse?.(socketAddr.toOptions().host); + if (this.transport.removeExpectedResponse) { + this.transport.removeExpectedResponse(socketAddr.toOptions().host); + } } private handleRequestTimeout(nodeAddr: INodeAddress, requestCall: IRequestCall): void { diff --git a/packages/discv5/src/transport/udp.ts b/packages/discv5/src/transport/udp.ts index 253bb4ed..f01e1c74 100644 --- a/packages/discv5/src/transport/udp.ts +++ b/packages/discv5/src/transport/udp.ts @@ -86,26 +86,29 @@ export class UDPTransportService } public async start(): Promise { - const [socket4, socket6] = await Promise.all([ - this.ip4 ? openSocket(this.ip4.opts) : undefined, - this.ip6 ? openSocket(this.ip6.opts) : undefined, - ]); - if (this.ip4) { - socket4?.on("message", this.handleIncoming); - this.ip4.socket = socket4; - } - if (this.ip6) { - socket6?.on("message", this.handleIncoming); - this.ip6.socket = socket6; - } + const configs: SocketOpts[] = []; + if (this.ip4) configs.push(this.ip4); + if (this.ip6) configs.push(this.ip6); + + const sockets = await Promise.all(configs.map((c) => openSocket(c.opts))); + + configs.forEach((config, index) => { + const socket = sockets[index]; + socket.on("message", this.handleIncoming); + config.socket = socket; + }); } public async stop(): Promise { - const socket4 = this.ip4?.socket; - const socket6 = this.ip6?.socket; - socket4?.off("message", this.handleIncoming); - socket6?.off("message", this.handleIncoming); - await Promise.all([closeSocket(socket4), closeSocket(socket6)]); + const sockets: dgram.Socket[] = []; + if (this.ip4 && this.ip4.socket) sockets.push(this.ip4.socket); + if (this.ip6 && this.ip6.socket) sockets.push(this.ip6.socket); + + sockets.forEach((socket) => { + socket.off("message", this.handleIncoming); + }); + + await Promise.all(sockets.map((socket) => closeSocket(socket))); } public async send(to: Multiaddr, toId: string, packet: IPacket): Promise { @@ -114,21 +117,27 @@ export class UDPTransportService if (!this.ip4) { throw new Error("Cannot send to an IPv4 address without a bound IPv4 socket"); } - this.ip4.socket?.send(encodePacket(toId, packet), nodeAddr.port, nodeAddr.host); + if (this.ip4.socket) { + this.ip4.socket.send(encodePacket(toId, packet), nodeAddr.port, nodeAddr.host); + } } else if (nodeAddr.family === 6) { if (!this.ip6) { throw new Error("Cannot send to an IPv6 address without a bound IPv6 socket"); } - this.ip6.socket?.send(encodePacket(toId, packet), nodeAddr.port, nodeAddr.host); + if (this.ip6.socket) { + this.ip6.socket.send(encodePacket(toId, packet), nodeAddr.port, nodeAddr.host); + } } } addExpectedResponse(ipAddress: string): void { - this.rateLimiter?.addExpectedResponse(ipAddress); + if (!this.rateLimiter) return; + this.rateLimiter.addExpectedResponse(ipAddress); } removeExpectedResponse(ipAddress: string): void { - this.rateLimiter?.removeExpectedResponse(ipAddress); + if (!this.rateLimiter) return; + this.rateLimiter.removeExpectedResponse(ipAddress); } getContactableAddr(enr: ENR): SocketAddress | undefined {