Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions packages/discv5/src/kademlia/bucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ export class Bucket extends (EventEmitter as { new (): BucketEventEmitter }) {
return InsertResult.NodeExists;
}

const isPendingNode = this.pending?.value.nodeId === value.nodeId;
const isPendingNode = this.pending && this.pending.value.nodeId === value.nodeId;

switch (status) {
case EntryStatus.Connected: {
Expand Down Expand Up @@ -141,7 +141,7 @@ export class Bucket extends (EventEmitter as { new (): BucketEventEmitter }) {
} else {
return UpdateResult.NotModified;
}
} else if (this.pending?.value.nodeId === value.nodeId) {
} else if (this.pending && this.pending.value.nodeId === value.nodeId) {
this.pending.value = value;
return UpdateResult.UpdatedPending;
} else {
Expand Down Expand Up @@ -192,7 +192,7 @@ export class Bucket extends (EventEmitter as { new (): BucketEventEmitter }) {
default:
throw new Error("Unreachable");
}
} else if (this.pending?.value.nodeId === id) {
} else if (this.pending && this.pending.value.nodeId === id) {
this.pending.status = status;
return UpdateResult.UpdatedPending;
} else {
Expand Down
8 changes: 6 additions & 2 deletions packages/discv5/src/rateLimit/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,19 @@ export class RateLimiter implements IRateLimiter {
}

if (!this.rateLimiterIP.allows(ip, 1)) {
this.metrics?.rateLimitHitIP.inc();
if (this.metrics) {
this.metrics.rateLimitHitIP.inc();
}

this.bannedIPs.set(ip, Date.now());

return false;
}

if (!this.rateLimiterGlobal.allows(null, 1)) {
this.metrics?.rateLimitHitTotal.inc();
if (this.metrics) {
this.metrics.rateLimitHitTotal.inc();
}
return false;
}

Expand Down
2 changes: 1 addition & 1 deletion packages/discv5/src/service/addrVotes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export class AddrVotes {
const socketAddrStr = serializeSocketAddr(ip);

const prevVote = this.votes.get(voter);
if (prevVote?.socketAddrStr === socketAddrStr) {
if (prevVote && prevVote.socketAddrStr === socketAddrStr) {
// Same vote, ignore
return false;
} else if (prevVote !== undefined) {
Expand Down
70 changes: 47 additions & 23 deletions packages/discv5/src/service/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,8 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
const enr = this.findEnr(peer);
if (!enr || !getSocketAddressMultiaddrOnENR(enr, this.ipMode)) {
log("Lookup %s requested an unknown ENR or ENR w/o UDP", lookupId);
this.activeLookups.get(lookupId)?.onFailure(peer);
const lookup = this.activeLookups.get(lookupId);
if (lookup) lookup.onFailure(peer);
return;
}

Expand All @@ -512,7 +513,9 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
log("Sending %s to node: %o", MessageType[activeRequest.request.type], nodeAddr);
try {
this.sessionService.sendRequest(activeRequest.contact, activeRequest.request);
this.metrics?.sentMessageCount.inc({ type: MessageType[activeRequest.request.type] });
if (this.metrics) {
this.metrics.sentMessageCount.inc({ type: MessageType[activeRequest.request.type] });
}
} catch (e) {
this.activeRequests.delete(bytesToBigint(activeRequest.request.id));
log("Error sending RPC to node: %o, :Error: %s", nodeAddr, (e as Error).message);
Expand All @@ -526,7 +529,9 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
log("Sending %s to node: %o", MessageType[response.type], nodeAddr);
try {
this.sessionService.sendResponse(nodeAddr, response);
this.metrics?.sentMessageCount.inc({ type: MessageType[response.type] });
if (this.metrics) {
this.metrics.sentMessageCount.inc({ type: MessageType[response.type] });
}
} catch (e) {
log("Error sending RPC to node: %o, :Error: %s", nodeAddr, (e as Error).message);
}
Expand Down Expand Up @@ -735,7 +740,9 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
*/
private handleRpcRequest = (nodeAddr: INodeAddress, request: RequestMessage): void => {
const requestType = MessageType[request.type];
this.metrics?.rcvdMessageCount.inc({ type: requestType });
if (this.metrics) {
this.metrics.rcvdMessageCount.inc({ type: requestType });
}

try {
switch (request.type) {
Expand Down Expand Up @@ -777,7 +784,9 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
log("Sending PONG response to node: %o", nodeAddr);
try {
this.sessionService.sendResponse(nodeAddr, pongMessage);
this.metrics?.sentMessageCount.inc({ type: MessageType[MessageType.PONG] });
if (this.metrics) {
this.metrics.sentMessageCount.inc({ type: MessageType[MessageType.PONG] });
}
} catch (e) {
log("Failed to send Pong. Error %s", (e as Error).message);
}
Expand Down Expand Up @@ -809,7 +818,9 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
log("Sending empty NODES response to %o", nodeAddr);
try {
this.sessionService.sendResponse(nodeAddr, createNodesMessage(id, 1, nodes));
this.metrics?.sentMessageCount.inc({ type: MessageType[MessageType.NODES] });
if (this.metrics) {
this.metrics.sentMessageCount.inc({ type: MessageType[MessageType.NODES] });
}
} catch (e) {
log("Failed to send a NODES response. Error: %s", (e as Error).message);
}
Expand All @@ -828,7 +839,9 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
const _nodes = nodes.slice(i, i + nodesPerPacket);
try {
this.sessionService.sendResponse(nodeAddr, createNodesMessage(id, total, _nodes));
this.metrics?.sentMessageCount.inc({ type: MessageType[MessageType.NODES] });
if (this.metrics) {
this.metrics.sentMessageCount.inc({ type: MessageType[MessageType.NODES] });
}
} catch (e) {
log("Failed to send a NODES response. Error: %s", (e as Error).message);
}
Expand All @@ -847,7 +860,9 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
*/
private handleRpcResponse = (nodeAddr: INodeAddress, response: ResponseMessage): void => {
const responseType = MessageType[response.type];
this.metrics?.rcvdMessageCount.inc({ type: responseType });
if (this.metrics) {
this.metrics.rcvdMessageCount.inc({ type: responseType });
}

// verify we know of the rpc id

Expand All @@ -867,21 +882,25 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
nodeAddr,
response.id
);
activeRequest.callbackPromise?.reject(
new CodeError(
"Received a response from an nexpected address",
ResponseErrorType[ResponseErrorType.WrongAddress]
)
);
if (activeRequest.callbackPromise) {
activeRequest.callbackPromise.reject(
new CodeError(
"Received a response from an nexpected address",
ResponseErrorType[ResponseErrorType.WrongAddress]
)
);
}
return;
}

// Check that the response type matches the request
if (!requestMatchesResponse(activeRequest.request, response)) {
log("Node gave an incorrect response type. Ignoring response from: %o", nodeAddr);
activeRequest.callbackPromise?.reject(
new CodeError("Response has incorrect response type", ResponseErrorType[ResponseErrorType.WrongResponseType])
);
if (activeRequest.callbackPromise) {
activeRequest.callbackPromise.reject(
new CodeError("Response has incorrect response type", ResponseErrorType[ResponseErrorType.WrongResponseType])
);
}
return;
}

Expand All @@ -901,13 +920,16 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
// TODO Implement all RPC methods
return;
}

activeRequest.callbackPromise?.resolve(toResponseType(response));
if (activeRequest.callbackPromise) {
activeRequest.callbackPromise.resolve(toResponseType(response));
}
} catch (e) {
log("Error handling rpc response: node: %o response: %s", nodeAddr, responseType);
activeRequest.callbackPromise?.reject(
new CodeError((e as Error).message, ResponseErrorType[ResponseErrorType.InternalError])
);
if (activeRequest.callbackPromise) {
activeRequest.callbackPromise.reject(
new CodeError((e as Error).message, ResponseErrorType[ResponseErrorType.InternalError])
);
}
}
};

Expand Down Expand Up @@ -1056,6 +1078,8 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
this.connectionUpdated(nodeId, { type: ConnectionStatusType.Disconnected });

// If this is initiated by the user, return the error on the callback.
callbackPromise?.reject(new CodeError("RPC failure", RequestErrorType[error]));
if (callbackPromise) {
callbackPromise.reject(new CodeError("RPC failure", RequestErrorType[error]));
}
};
}
14 changes: 10 additions & 4 deletions packages/discv5/src/session/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ export class SessionService extends (EventEmitter as { new (): StrictEventEmitte
// table (remote_enr is None) then we re-request the ENR to keep the session up to date.

// send the challenge
const enrSeq = remoteEnr?.seq ?? 0n;
const enrSeq = (remoteEnr && remoteEnr.seq) ?? 0n;
const packet = createWhoAreYouPacket(nonce, enrSeq);
const challengeData = encodeChallengeData(packet.maskingIv, packet.header);

Expand Down Expand Up @@ -485,7 +485,9 @@ export class SessionService extends (EventEmitter as { new (): StrictEventEmitte
const enrMultiaddrIP6 = getSocketAddressMultiaddrOnENR(enr, { ...this.ipMode, ip4: false } as IPMode);
return (
enr.nodeId === nodeAddr.nodeId &&
(enrMultiaddrIP4?.equals(nodeAddr.socketAddr) ?? enrMultiaddrIP6?.equals(nodeAddr.socketAddr) ?? true)
((enrMultiaddrIP4 && enrMultiaddrIP4.equals(nodeAddr.socketAddr)) ??
(enrMultiaddrIP6 && enrMultiaddrIP6.equals(nodeAddr.socketAddr)) ??
true)
);
}

Expand Down Expand Up @@ -759,11 +761,15 @@ export class SessionService extends (EventEmitter as { new (): StrictEventEmitte
}

private removeExpectedResponse(socketAddr: Multiaddr): void {
this.transport.addExpectedResponse?.(socketAddr.toOptions().host);
if (this.transport.addExpectedResponse) {
this.transport.addExpectedResponse(socketAddr.toOptions().host);
}
}

private addExpectedResponse(socketAddr: Multiaddr): void {
this.transport.removeExpectedResponse?.(socketAddr.toOptions().host);
if (this.transport.removeExpectedResponse) {
this.transport.removeExpectedResponse(socketAddr.toOptions().host);
}
}

private handleRequestTimeout(nodeAddr: INodeAddress, requestCall: IRequestCall): void {
Expand Down
51 changes: 30 additions & 21 deletions packages/discv5/src/transport/udp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,26 +86,29 @@ export class UDPTransportService
}

public async start(): Promise<void> {
const [socket4, socket6] = await Promise.all([
this.ip4 ? openSocket(this.ip4.opts) : undefined,
this.ip6 ? openSocket(this.ip6.opts) : undefined,
]);
if (this.ip4) {
socket4?.on("message", this.handleIncoming);
this.ip4.socket = socket4;
}
if (this.ip6) {
socket6?.on("message", this.handleIncoming);
this.ip6.socket = socket6;
}
const configs: SocketOpts[] = [];
if (this.ip4) configs.push(this.ip4);
if (this.ip6) configs.push(this.ip6);

const sockets = await Promise.all(configs.map((c) => openSocket(c.opts)));

configs.forEach((config, index) => {
const socket = sockets[index];
socket.on("message", this.handleIncoming);
config.socket = socket;
});
}

public async stop(): Promise<void> {
const socket4 = this.ip4?.socket;
const socket6 = this.ip6?.socket;
socket4?.off("message", this.handleIncoming);
socket6?.off("message", this.handleIncoming);
await Promise.all([closeSocket(socket4), closeSocket(socket6)]);
const sockets: dgram.Socket[] = [];
if (this.ip4 && this.ip4.socket) sockets.push(this.ip4.socket);
if (this.ip6 && this.ip6.socket) sockets.push(this.ip6.socket);

sockets.forEach((socket) => {
socket.off("message", this.handleIncoming);
});

await Promise.all(sockets.map((socket) => closeSocket(socket)));
}

public async send(to: Multiaddr, toId: string, packet: IPacket): Promise<void> {
Expand All @@ -114,21 +117,27 @@ export class UDPTransportService
if (!this.ip4) {
throw new Error("Cannot send to an IPv4 address without a bound IPv4 socket");
}
this.ip4.socket?.send(encodePacket(toId, packet), nodeAddr.port, nodeAddr.host);
if (this.ip4.socket) {
this.ip4.socket.send(encodePacket(toId, packet), nodeAddr.port, nodeAddr.host);
}
} else if (nodeAddr.family === 6) {
if (!this.ip6) {
throw new Error("Cannot send to an IPv6 address without a bound IPv6 socket");
}
this.ip6.socket?.send(encodePacket(toId, packet), nodeAddr.port, nodeAddr.host);
if (this.ip6.socket) {
this.ip6.socket.send(encodePacket(toId, packet), nodeAddr.port, nodeAddr.host);
}
}
}

addExpectedResponse(ipAddress: string): void {
this.rateLimiter?.addExpectedResponse(ipAddress);
if (!this.rateLimiter) return;
this.rateLimiter.addExpectedResponse(ipAddress);
}

removeExpectedResponse(ipAddress: string): void {
this.rateLimiter?.removeExpectedResponse(ipAddress);
if (!this.rateLimiter) return;
this.rateLimiter.removeExpectedResponse(ipAddress);
}

getContactableAddr(enr: ENR): SocketAddress | undefined {
Expand Down