Skip to content

Commit 064d78d

Browse files
committed
feat(client): Add Redis Enterprise maintenance handling capabilities
- Introduced `EnterpriseMaintenanceManager` to manage Redis Enterprise maintenance events and push notifications. - Integrated `EnterpriseMaintenanceManager` into `RedisClient` to handle maintenance push notifications and manage socket transitions. - Implemented graceful handling of MOVING, MIGRATING, and FAILOVER push notifications, including socket replacement and timeout adjustments.
1 parent f652d1e commit 064d78d

File tree

4 files changed

+341
-3
lines changed

4 files changed

+341
-3
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { TypeMapping, ReplyUnion, RespVersions, RedisArgument } from '../RESP/ty
55
import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub';
66
import { AbortError, ErrorReply, CommandTimeoutDuringMaintananceError, TimeoutError } from '../errors';
77
import { MonitorCallback } from '.';
8+
import { dbgMaintenance } from './enterprise-maintenance-manager';
89

910
export interface CommandOptions<T = TypeMapping> {
1011
chainId?: symbol;
@@ -79,6 +80,7 @@ export default class RedisCommandsQueue {
7980
#maintenanceCommandTimeout: number | undefined
8081

8182
setMaintenanceCommandTimeout(ms: number | undefined) {
83+
dbgMaintenance(`Setting maintenance command timeout to ${ms}`);
8284
// Prevent possible api misuse
8385
if (this.#maintenanceCommandTimeout === ms) return;
8486

@@ -112,6 +114,7 @@ export default class RedisCommandsQueue {
112114
};
113115
signal.addEventListener('abort', command.timeout.listener, { once: true });
114116
};
117+
dbgMaintenance(`Total of ${counter} timeouts reset to ${ms}`);
115118
}
116119

117120
get isPubSubActive() {
Lines changed: 309 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
import { RedisClientOptions } from ".";
2+
import RedisCommandsQueue from "./commands-queue";
3+
import { RedisArgument } from "../..";
4+
import { isIP } from "net";
5+
import { lookup } from "dns/promises";
6+
import assert from "node:assert";
7+
import { setTimeout } from "node:timers/promises";
8+
import RedisSocket from "./socket";
9+
10+
export const MAINTENANCE_EVENTS = {
11+
PAUSE_WRITING: "pause-writing",
12+
RESUME_WRITING: "resume-writing",
13+
TIMEOUTS_UPDATE: "timeouts-update",
14+
} as const;
15+
16+
const PN = {
17+
MOVING: "MOVING",
18+
MIGRATING: "MIGRATING",
19+
MIGRATED: "MIGRATED",
20+
FAILING_OVER: "FAILING_OVER",
21+
FAILED_OVER: "FAILED_OVER",
22+
};
23+
24+
export const dbgMaintenance = (...args: any[]) => {
25+
if (!process.env.DEBUG_MAINTENANCE) return;
26+
return console.log("[MNT]", ...args);
27+
};
28+
29+
export interface MaintenanceUpdate {
30+
inMaintenance: boolean;
31+
relaxedCommandTimeout?: number;
32+
relaxedSocketTimeout?: number;
33+
}
34+
35+
interface Client {
36+
_ejectSocket: () => RedisSocket;
37+
_insertSocket: (socket: RedisSocket) => void;
38+
_pause: () => void;
39+
_unpause: () => void;
40+
_maintenanceUpdate: (update: MaintenanceUpdate) => void;
41+
duplicate: (options: RedisClientOptions) => Client;
42+
connect: () => Promise<Client>;
43+
destroy: () => void;
44+
}
45+
46+
export default class EnterpriseMaintenanceManager {
47+
#commandsQueue: RedisCommandsQueue;
48+
#options: RedisClientOptions;
49+
#isMaintenance = 0;
50+
#client: Client;
51+
52+
static setupDefaultMaintOptions(options: RedisClientOptions) {
53+
if (options.maintPushNotifications === undefined) {
54+
options.maintPushNotifications =
55+
options?.RESP === 3 ? "auto" : "disabled";
56+
}
57+
if (options.maintMovingEndpointType === undefined) {
58+
options.maintMovingEndpointType = "auto";
59+
}
60+
if (options.maintRelaxedSocketTimeout === undefined) {
61+
options.maintRelaxedSocketTimeout = 10000;
62+
}
63+
if (options.maintRelaxedCommandTimeout === undefined) {
64+
options.maintRelaxedCommandTimeout = 10000;
65+
}
66+
}
67+
68+
static async getHandshakeCommand(
69+
tls: boolean,
70+
host: string,
71+
options: RedisClientOptions,
72+
): Promise<
73+
| { cmd: Array<RedisArgument>; errorHandler: (error: Error) => void }
74+
| undefined
75+
> {
76+
if (options.maintPushNotifications === "disabled") return;
77+
78+
const movingEndpointType = await determineEndpoint(tls, host, options);
79+
return {
80+
cmd: [
81+
"CLIENT",
82+
"MAINT_NOTIFICATIONS",
83+
"ON",
84+
"moving-endpoint-type",
85+
movingEndpointType,
86+
],
87+
errorHandler: (error: Error) => {
88+
dbgMaintenance("handshake failed:", error);
89+
if (options.maintPushNotifications === "enabled") {
90+
throw error;
91+
}
92+
},
93+
};
94+
}
95+
96+
constructor(
97+
commandsQueue: RedisCommandsQueue,
98+
client: Client,
99+
options: RedisClientOptions,
100+
) {
101+
this.#commandsQueue = commandsQueue;
102+
this.#options = options;
103+
this.#client = client;
104+
105+
this.#commandsQueue.addPushHandler(this.#onPush);
106+
}
107+
108+
#onPush = (push: Array<any>): boolean => {
109+
dbgMaintenance("ONPUSH:", push.map(String));
110+
switch (push[0].toString()) {
111+
case PN.MOVING: {
112+
// [ 'MOVING', '17', '15', '54.78.247.156:12075' ]
113+
// ^seq ^after ^new ip
114+
const afterSeconds = push[2];
115+
const url: string | null = push[3] ? String(push[3]) : null;
116+
dbgMaintenance("Received MOVING:", afterSeconds, url);
117+
this.#onMoving(afterSeconds, url);
118+
return true;
119+
}
120+
case PN.MIGRATING:
121+
case PN.FAILING_OVER: {
122+
dbgMaintenance("Received MIGRATING|FAILING_OVER");
123+
this.#onMigrating();
124+
return true;
125+
}
126+
case PN.MIGRATED:
127+
case PN.FAILED_OVER: {
128+
dbgMaintenance("Received MIGRATED|FAILED_OVER");
129+
this.#onMigrated();
130+
return true;
131+
}
132+
}
133+
return false;
134+
};
135+
136+
// Queue:
137+
// toWrite [ C D E ]
138+
// waitingForReply [ A B ] - aka In-flight commands
139+
//
140+
// time: ---1-2---3-4-5-6---------------------------
141+
//
142+
// 1. [EVENT] MOVING PN received
143+
// 2. [ACTION] Pause writing ( we need to wait for new socket to connect and for all in-flight commands to complete )
144+
// 3. [EVENT] New socket connected
145+
// 4. [EVENT] In-flight commands completed
146+
// 5. [ACTION] Destroy old socket
147+
// 6. [ACTION] Resume writing -> we are going to write to the new socket from now on
148+
#onMoving = async (
149+
afterSeconds: number,
150+
url: string | null,
151+
): Promise<void> => {
152+
// 1 [EVENT] MOVING PN received
153+
this.#onMigrating();
154+
155+
let host: string;
156+
let port: number;
157+
158+
// The special value `none` indicates that the `MOVING` message doesn’t need
159+
// to contain an endpoint. Instead it contains the value `null` then. In
160+
// such a corner case, the client is expected to schedule a graceful
161+
// reconnect to its currently configured endpoint after half of the grace
162+
// period that was communicated by the server is over.
163+
if (url === null) {
164+
assert(this.#options.maintMovingEndpointType === "none");
165+
assert(this.#options.socket !== undefined);
166+
assert("host" in this.#options.socket);
167+
assert(typeof this.#options.socket.host === "string");
168+
host = this.#options.socket.host;
169+
assert(typeof this.#options.socket.port === "number");
170+
port = this.#options.socket.port;
171+
const waitTime = (afterSeconds * 1000) / 2;
172+
dbgMaintenance(`Wait for ${waitTime}ms`);
173+
await setTimeout(waitTime);
174+
} else {
175+
const split = url.split(":");
176+
host = split[0];
177+
port = Number(split[1]);
178+
}
179+
180+
// 2 [ACTION] Pause writing
181+
dbgMaintenance("Pausing writing of new commands to old socket");
182+
this.#client._pause();
183+
184+
const tmpClient = this.#client.duplicate({
185+
maintPushNotifications: "disabled",
186+
socket: {
187+
...this.#options.socket,
188+
host,
189+
port,
190+
},
191+
});
192+
193+
dbgMaintenance(`Connecting tmp client: ${host}:${port}`);
194+
await tmpClient.connect();
195+
dbgMaintenance(`Connected to tmp client`);
196+
// 3 [EVENT] New socket connected
197+
198+
//TODO
199+
// dbgMaintenance(
200+
// `Set timeout for new socket to ${this.#options.maintRelaxedSocketTimeout}`,
201+
// );
202+
// newSocket.setMaintenanceTimeout(this.#options.maintRelaxedSocketTimeout);
203+
204+
dbgMaintenance(`Wait for all in-flight commands to complete`);
205+
await this.#commandsQueue.waitForInflightCommandsToComplete();
206+
dbgMaintenance(`In-flight commands completed`);
207+
// 4 [EVENT] In-flight commands completed
208+
209+
dbgMaintenance("Swap client sockets...");
210+
const oldSocket = this.#client._ejectSocket();
211+
const newSocket = tmpClient._ejectSocket();
212+
this.#client._insertSocket(newSocket);
213+
tmpClient._insertSocket(oldSocket);
214+
tmpClient.destroy();
215+
dbgMaintenance("Swap client sockets done.");
216+
// 5 + 6
217+
dbgMaintenance("Resume writing");
218+
this.#client._unpause();
219+
this.#onMigrated();
220+
};
221+
222+
#onMigrating = async () => {
223+
this.#isMaintenance++;
224+
if (this.#isMaintenance > 1) {
225+
dbgMaintenance(`Timeout relaxation already done`);
226+
return;
227+
}
228+
229+
const update: MaintenanceUpdate = {
230+
inMaintenance: true,
231+
relaxedCommandTimeout: this.#options.maintRelaxedCommandTimeout,
232+
relaxedSocketTimeout: this.#options.maintRelaxedSocketTimeout,
233+
};
234+
235+
this.#client._maintenanceUpdate(update);
236+
};
237+
238+
#onMigrated = async () => {
239+
this.#isMaintenance--;
240+
assert(this.#isMaintenance >= 0);
241+
if (this.#isMaintenance > 0) {
242+
dbgMaintenance(`Not ready to unrelax timeouts yet`);
243+
return;
244+
}
245+
246+
const update: MaintenanceUpdate = {
247+
inMaintenance : false
248+
};
249+
250+
this.#client._maintenanceUpdate(update);
251+
};
252+
}
253+
254+
export type MovingEndpointType =
255+
| "auto"
256+
| "internal-ip"
257+
| "internal-fqdn"
258+
| "external-ip"
259+
| "external-fqdn"
260+
| "none";
261+
262+
function isPrivateIP(ip: string): boolean {
263+
const version = isIP(ip);
264+
if (version === 4) {
265+
const octets = ip.split(".").map(Number);
266+
return (
267+
octets[0] === 10 ||
268+
(octets[0] === 172 && octets[1] >= 16 && octets[1] <= 31) ||
269+
(octets[0] === 192 && octets[1] === 168)
270+
);
271+
}
272+
if (version === 6) {
273+
return (
274+
ip.startsWith("fc") || // Unique local
275+
ip.startsWith("fd") || // Unique local
276+
ip === "::1" || // Loopback
277+
ip.startsWith("fe80") // Link-local unicast
278+
);
279+
}
280+
return false;
281+
}
282+
283+
async function determineEndpoint(
284+
tlsEnabled: boolean,
285+
host: string,
286+
options: RedisClientOptions,
287+
): Promise<MovingEndpointType> {
288+
assert(options.maintMovingEndpointType !== undefined);
289+
if (options.maintMovingEndpointType !== "auto") {
290+
dbgMaintenance(
291+
`Determine endpoint type: ${options.maintMovingEndpointType}`,
292+
);
293+
return options.maintMovingEndpointType;
294+
}
295+
296+
const ip = isIP(host) ? host : (await lookup(host, { family: 0 })).address;
297+
298+
const isPrivate = isPrivateIP(ip);
299+
300+
let result: MovingEndpointType;
301+
if (tlsEnabled) {
302+
result = isPrivate ? "internal-fqdn" : "external-fqdn";
303+
} else {
304+
result = isPrivate ? "internal-ip" : "external-ip";
305+
}
306+
307+
dbgMaintenance(`Determine endpoint type: ${result}`);
308+
return result;
309+
}

0 commit comments

Comments
 (0)