From c5e4a82b4a78f262b00d49d5a05c36d452075f1c Mon Sep 17 00:00:00 2001 From: NatalieShaked Date: Mon, 17 Feb 2025 14:18:53 +0200 Subject: [PATCH 1/3] fix: liveness --- src/common/utils.ts | 70 ++++++++++++++++++++++++++---------------- src/containerConfig.ts | 10 +++++- 2 files changed, 53 insertions(+), 27 deletions(-) diff --git a/src/common/utils.ts b/src/common/utils.ts index f592234..3442308 100644 --- a/src/common/utils.ts +++ b/src/common/utils.ts @@ -1,34 +1,52 @@ -import { DependencyContainer, FactoryFunction } from 'tsyringe'; +import { DependencyContainer } from 'tsyringe'; import { Logger } from '@map-colonies/js-logger'; +import { HealthCheck } from '@godaddy/terminus'; import { RedisClient } from '../redis/index'; import { SERVICES } from './constants'; -export const healthCheckFactory: FactoryFunction = (container: DependencyContainer): void => { - const logger = container.resolve(SERVICES.LOGGER); - const geocodingRedis = container.resolve(SERVICES.GEOCODING_REDIS); - const ttlRedis = container.resolve(SERVICES.TTL_REDIS); +export const healthCheckFactory = (container: DependencyContainer): HealthCheck => { + return async (): Promise => { + const logger = container.resolve(SERVICES.LOGGER); + const geocodingRedis = container.resolve(SERVICES.GEOCODING_REDIS); + const ttlRedis = container.resolve(SERVICES.TTL_REDIS); - geocodingRedis - .ping() - .then(() => { - return; - }) - .catch((error: Error) => { - logger.error({ - message: `Healthcheck failed for GeocodingRedis.`, - error, - }); - }); + const promises: Promise[] = []; + + promises.push( + new Promise((resolve, reject) => { + geocodingRedis + .ping() + .then(() => { + resolve('Healthcheck passed for GeocodingRedis connection'); + }) + .catch((error: Error) => { + reject({ + msg: `Healthcheck failed for GeocodingRedis.`, + error, + }); + }); + }) + ); + promises.push( + new Promise((resolve, reject) => { + ttlRedis + .ping() + .then(() => { + resolve('Healthcheck passed for TTLRedis connection'); + }) + .catch((error: Error) => { + reject({ + msg: `Healthcheck failed for TTLRedis.`, + error, + }); + }); + }) + ); - ttlRedis - .ping() - .then(() => { - return; - }) - .catch((error: Error) => { - logger.error({ - message: `Healthcheck failed for GeocodingRedis.`, - error, - }); + await Promise.allSettled(promises).then((results) => { + results.forEach((msg) => logger.info({ msg })); }); + + return Promise.resolve(); + }; }; diff --git a/src/containerConfig.ts b/src/containerConfig.ts index 675945e..f82128a 100644 --- a/src/containerConfig.ts +++ b/src/containerConfig.ts @@ -2,6 +2,7 @@ import config from 'config'; import { Producer } from 'kafkajs'; import { getOtelMixin } from '@map-colonies/telemetry'; import { trace, metrics as OtelMetrics } from '@opentelemetry/api'; +import { HealthCheck } from '@godaddy/terminus'; import { DependencyContainer } from 'tsyringe/dist/typings/types'; import jsLogger, { LoggerOptions } from '@map-colonies/js-logger'; import { CleanupRegistry } from '@map-colonies/cleanup-registry'; @@ -94,7 +95,14 @@ export const registerExternalValues = async (options?: RegisterOptions): Promise } }, }, - { token: HEALTHCHECK, provider: { useFactory: healthCheckFactory } }, + { + token: HEALTHCHECK, + provider: { + useFactory: (depContainer): HealthCheck => { + return healthCheckFactory(depContainer); + }, + }, + }, { token: REDIS_SUB, provider: { From 0629f791297cfc04e4a4cb9c03af6306489a0e89 Mon Sep 17 00:00:00 2001 From: NatalieShaked Date: Wed, 19 Feb 2025 13:25:40 +0200 Subject: [PATCH 2/3] fix: Redis subscriber configuration --- src/containerConfig.ts | 7 ++-- src/index.ts | 10 +++--- src/redis/subscribe.ts | 76 +++++++++++++++++++++++++++++++----------- 3 files changed, 65 insertions(+), 28 deletions(-) diff --git a/src/containerConfig.ts b/src/containerConfig.ts index f82128a..a5dc9d1 100644 --- a/src/containerConfig.ts +++ b/src/containerConfig.ts @@ -8,14 +8,13 @@ import jsLogger, { LoggerOptions } from '@map-colonies/js-logger'; import { CleanupRegistry } from '@map-colonies/cleanup-registry'; import { Metrics } from '@map-colonies/telemetry'; import { instancePerContainerCachingFactory } from 'tsyringe'; -import { createClient } from 'redis'; import { CLEANUP_REGISTRY, HEALTHCHECK, ON_SIGNAL, REDIS_CLIENT_FACTORY, REDIS_SUB, SERVICES, SERVICE_NAME } from './common/constants'; import { tracing } from './common/tracing'; import { feedbackRouterFactory, FEEDBACK_ROUTER_SYMBOL } from './feedback/routes/feedbackRouter'; import { InjectionObject, registerDependencies } from './common/dependencyRegistration'; import { RedisClient, RedisClientFactory } from './redis'; import { kafkaClientFactory } from './kafka'; -import { redisSubscribe } from './redis/subscribe'; +import { createRedisClient, redisSubscribe } from './redis/subscribe'; import { healthCheckFactory } from './common/utils'; export interface RegisterOptions { @@ -107,14 +106,14 @@ export const registerExternalValues = async (options?: RegisterOptions): Promise token: REDIS_SUB, provider: { useFactory: instancePerContainerCachingFactory((): RedisClient => { - const subscriber = createClient(); + const subscriber = createRedisClient(config, logger); return subscriber; }), }, postInjectionHook: async (deps: DependencyContainer): Promise => { const subscriber = deps.resolve(REDIS_SUB); cleanupRegistry.register({ - func: async () => { + func: async (): Promise => { await subscriber.quit(); return Promise.resolve(); }, diff --git a/src/index.ts b/src/index.ts index b11700e..cea38d6 100644 --- a/src/index.ts +++ b/src/index.ts @@ -15,11 +15,11 @@ void getApp() .then(({ app, container }) => { depContainer = container; - const logger = container.resolve(SERVICES.LOGGER); + const logger = depContainer.resolve(SERVICES.LOGGER); const server = createTerminus(createServer(app), { // eslint-disable-next-line @typescript-eslint/naming-convention - healthChecks: { '/liveness': container.resolve(HEALTHCHECK) }, - onSignal: container.resolve(ON_SIGNAL), + healthChecks: { '/liveness': depContainer.resolve(HEALTHCHECK) }, + onSignal: depContainer.resolve(ON_SIGNAL), }); server.listen(port, () => { @@ -28,12 +28,12 @@ void getApp() }) .catch(async (error: Error) => { const errorLogger = - depContainer?.isRegistered(SERVICES.LOGGER) == true + depContainer?.isRegistered(SERVICES.LOGGER) === true ? depContainer.resolve(SERVICES.LOGGER).error.bind(depContainer.resolve(SERVICES.LOGGER)) : console.error; errorLogger({ msg: '😢 - failed initializing the server', err: error }); - if (depContainer?.isRegistered(ON_SIGNAL) == true) { + if (depContainer?.isRegistered(ON_SIGNAL) === true) { const shutDown: () => Promise = depContainer.resolve(ON_SIGNAL); await shutDown(); } diff --git a/src/redis/subscribe.ts b/src/redis/subscribe.ts index c5bd181..303e5c3 100644 --- a/src/redis/subscribe.ts +++ b/src/redis/subscribe.ts @@ -1,11 +1,43 @@ +import { readFileSync } from 'fs'; import { Logger } from '@map-colonies/js-logger'; import { DependencyContainer } from 'tsyringe'; import { Producer } from 'kafkajs'; +import { createClient, RedisClientOptions } from 'redis'; import { REDIS_SUB, SERVICES } from '../common/constants'; -import { IConfig, FeedbackResponse, GeocodingResponse } from '../common/interfaces'; +import { IConfig, FeedbackResponse, GeocodingResponse, RedisConfig } from '../common/interfaces'; import { NotFoundError } from '../common/errors'; import { RedisClient } from '../redis/index'; +const createConnectionOptions = (redisConfig: RedisConfig): Partial => { + const { host, port, enableSslAuth, sslPaths, databases, ...clientOptions } = redisConfig; + clientOptions.socket = { host, port }; + if (enableSslAuth) { + clientOptions.socket = { + ...clientOptions.socket, + tls: true, + key: sslPaths.key !== '' ? readFileSync(sslPaths.key) : undefined, + cert: sslPaths.cert !== '' ? readFileSync(sslPaths.cert) : undefined, + ca: sslPaths.ca !== '' ? readFileSync(sslPaths.ca) : undefined, + }; + } + return clientOptions; +}; + +export const createRedisClient = (config: IConfig, logger: Logger): RedisClient => { + const dbConfig = config.get('redis'); + + const connectionOptions = createConnectionOptions(dbConfig); + + const redisClient = createClient(connectionOptions) + .on('error', (error: Error) => logger.error({ msg: 'redis client errored', err: error })) + .on('reconnecting', (...args) => logger.warn({ msg: 'redis client reconnecting', ...args })) + .on('end', (...args) => logger.info({ msg: 'redis client end', ...args })) + .on('connect', (...args) => logger.debug({ msg: 'redis client connected', ...args })) + .on('ready', (...args) => logger.debug({ msg: 'redis client is ready', ...args })); + + return redisClient; +}; + export const send = async (message: FeedbackResponse, logger: Logger, config: IConfig, kafkaProducer: Producer): Promise => { const topic = config.get('outputTopic'); logger.info(`Kafka send message. Topic: ${topic}`); @@ -29,29 +61,35 @@ export const redisSubscribe = async (deps: DependencyContainer): Promise(SERVICES.LOGGER); const subscriber = deps.resolve(REDIS_SUB); + logger.debug('Redis subscriber init'); const ttlDB = config.get('redis.databases.ttlIndex'); const geocodingDB = config.get('redis.databases.geocodingIndex'); const redisTTL = config.get('redis.ttl'); - await subscriber.subscribe(`__keyevent@${geocodingDB}__:set`, async (message) => { - logger.info(`Redis: Got new request ${message}`); - // eslint-disable-next-line @typescript-eslint/naming-convention - await ttlRedis.set(message, '', { EX: redisTTL }); - }); + try { + await subscriber.subscribe(`__keyevent@${geocodingDB}__:set`, async (message) => { + logger.info(`Redis: Got new request ${message}`); + // eslint-disable-next-line @typescript-eslint/naming-convention + await ttlRedis.set(message, '', { EX: redisTTL }); + }); - await subscriber.subscribe(`__keyevent@${ttlDB}__:expired`, async (message: string) => { - let wasUsed; - const redisResponse = (await geocodingRedis.get(message)) as string; - if (redisResponse) { - const geocodingResponse = JSON.parse(redisResponse) as GeocodingResponse; - wasUsed = geocodingResponse.wasUsed; - } - if (!(wasUsed ?? false)) { - await sendNoChosenResult(message, logger, config, kafkaProducer, geocodingRedis); - } - await geocodingRedis.del(message); - }); - return subscriber; + await subscriber.subscribe(`__keyevent@${ttlDB}__:expired`, async (message: string) => { + let wasUsed; + const redisResponse = (await geocodingRedis.get(message)) as string; + if (redisResponse) { + const geocodingResponse = JSON.parse(redisResponse) as GeocodingResponse; + wasUsed = geocodingResponse.wasUsed; + } + if (!(wasUsed ?? false)) { + await sendNoChosenResult(message, logger, config, kafkaProducer, geocodingRedis); + } + await geocodingRedis.del(message); + }); + return subscriber; + } catch (error) { + logger.error({ msg: `Redis Subscriber Error: ${(error as Error).message}` }); + throw error; + } }; export const sendNoChosenResult = async ( From b51354d6e2102a32c7d8842a4aa82b048f34ed5a Mon Sep 17 00:00:00 2001 From: NatalieShaked Date: Wed, 19 Feb 2025 14:29:43 +0200 Subject: [PATCH 3/3] chore: remove duplicated redis init --- src/common/utils.ts | 2 +- src/containerConfig.ts | 38 +++++++-------------- src/redis/subscribe.ts | 75 +++++++++++------------------------------- 3 files changed, 32 insertions(+), 83 deletions(-) diff --git a/src/common/utils.ts b/src/common/utils.ts index 3442308..d7ad0bd 100644 --- a/src/common/utils.ts +++ b/src/common/utils.ts @@ -44,7 +44,7 @@ export const healthCheckFactory = (container: DependencyContainer): HealthCheck ); await Promise.allSettled(promises).then((results) => { - results.forEach((msg) => logger.info({ msg })); + results.forEach((msg) => logger.debug({ msg })); }); return Promise.resolve(); diff --git a/src/containerConfig.ts b/src/containerConfig.ts index a5dc9d1..b99a215 100644 --- a/src/containerConfig.ts +++ b/src/containerConfig.ts @@ -12,9 +12,9 @@ import { CLEANUP_REGISTRY, HEALTHCHECK, ON_SIGNAL, REDIS_CLIENT_FACTORY, REDIS_S import { tracing } from './common/tracing'; import { feedbackRouterFactory, FEEDBACK_ROUTER_SYMBOL } from './feedback/routes/feedbackRouter'; import { InjectionObject, registerDependencies } from './common/dependencyRegistration'; -import { RedisClient, RedisClientFactory } from './redis'; +import { RedisClientFactory } from './redis'; import { kafkaClientFactory } from './kafka'; -import { createRedisClient, redisSubscribe } from './redis/subscribe'; +import { redisSubscribe } from './redis/subscribe'; import { healthCheckFactory } from './common/utils'; export interface RegisterOptions { @@ -79,7 +79,8 @@ export const registerExternalValues = async (options?: RegisterOptions): Promise provider: { useClass: RedisClientFactory }, postInjectionHook: async (deps: DependencyContainer): Promise => { const redisFactory = deps.resolve(REDIS_CLIENT_FACTORY); - for (const redisIndex of [SERVICES.GEOCODING_REDIS, SERVICES.TTL_REDIS]) { + + for (const redisIndex of [SERVICES.GEOCODING_REDIS, SERVICES.TTL_REDIS, REDIS_SUB]) { const redis = redisFactory.createRedisClient(redisIndex); deps.register(redisIndex, { useValue: redis }); cleanupRegistry.register({ @@ -90,7 +91,14 @@ export const registerExternalValues = async (options?: RegisterOptions): Promise id: redisIndex, }); await redis.connect(); - logger.info(`Connected to ${redisIndex.toString()}`); + + let redisName = redisIndex.toString(); + redisName = redisName.substring(redisName.indexOf('(') + 1, redisName.lastIndexOf(')')); + logger.info(`Connected to ${redisName}`); + + if (redisIndex === REDIS_SUB) { + await redisSubscribe(deps); + } } }, }, @@ -102,28 +110,6 @@ export const registerExternalValues = async (options?: RegisterOptions): Promise }, }, }, - { - token: REDIS_SUB, - provider: { - useFactory: instancePerContainerCachingFactory((): RedisClient => { - const subscriber = createRedisClient(config, logger); - return subscriber; - }), - }, - postInjectionHook: async (deps: DependencyContainer): Promise => { - const subscriber = deps.resolve(REDIS_SUB); - cleanupRegistry.register({ - func: async (): Promise => { - await subscriber.quit(); - return Promise.resolve(); - }, - id: REDIS_SUB, - }); - await subscriber.connect(); - logger.info('Connected to Redis Subscriber'); - await redisSubscribe(deps); - }, - }, { token: ON_SIGNAL, provider: { diff --git a/src/redis/subscribe.ts b/src/redis/subscribe.ts index 303e5c3..fc72cf6 100644 --- a/src/redis/subscribe.ts +++ b/src/redis/subscribe.ts @@ -1,43 +1,11 @@ -import { readFileSync } from 'fs'; import { Logger } from '@map-colonies/js-logger'; import { DependencyContainer } from 'tsyringe'; import { Producer } from 'kafkajs'; -import { createClient, RedisClientOptions } from 'redis'; import { REDIS_SUB, SERVICES } from '../common/constants'; -import { IConfig, FeedbackResponse, GeocodingResponse, RedisConfig } from '../common/interfaces'; +import { IConfig, FeedbackResponse, GeocodingResponse } from '../common/interfaces'; import { NotFoundError } from '../common/errors'; import { RedisClient } from '../redis/index'; -const createConnectionOptions = (redisConfig: RedisConfig): Partial => { - const { host, port, enableSslAuth, sslPaths, databases, ...clientOptions } = redisConfig; - clientOptions.socket = { host, port }; - if (enableSslAuth) { - clientOptions.socket = { - ...clientOptions.socket, - tls: true, - key: sslPaths.key !== '' ? readFileSync(sslPaths.key) : undefined, - cert: sslPaths.cert !== '' ? readFileSync(sslPaths.cert) : undefined, - ca: sslPaths.ca !== '' ? readFileSync(sslPaths.ca) : undefined, - }; - } - return clientOptions; -}; - -export const createRedisClient = (config: IConfig, logger: Logger): RedisClient => { - const dbConfig = config.get('redis'); - - const connectionOptions = createConnectionOptions(dbConfig); - - const redisClient = createClient(connectionOptions) - .on('error', (error: Error) => logger.error({ msg: 'redis client errored', err: error })) - .on('reconnecting', (...args) => logger.warn({ msg: 'redis client reconnecting', ...args })) - .on('end', (...args) => logger.info({ msg: 'redis client end', ...args })) - .on('connect', (...args) => logger.debug({ msg: 'redis client connected', ...args })) - .on('ready', (...args) => logger.debug({ msg: 'redis client is ready', ...args })); - - return redisClient; -}; - export const send = async (message: FeedbackResponse, logger: Logger, config: IConfig, kafkaProducer: Producer): Promise => { const topic = config.get('outputTopic'); logger.info(`Kafka send message. Topic: ${topic}`); @@ -66,30 +34,25 @@ export const redisSubscribe = async (deps: DependencyContainer): Promise('redis.databases.geocodingIndex'); const redisTTL = config.get('redis.ttl'); - try { - await subscriber.subscribe(`__keyevent@${geocodingDB}__:set`, async (message) => { - logger.info(`Redis: Got new request ${message}`); - // eslint-disable-next-line @typescript-eslint/naming-convention - await ttlRedis.set(message, '', { EX: redisTTL }); - }); + await subscriber.subscribe(`__keyevent@${geocodingDB}__:set`, async (message) => { + logger.info(`Redis: Got new request ${message}`); + // eslint-disable-next-line @typescript-eslint/naming-convention + await ttlRedis.set(message, '', { EX: redisTTL }); + }); - await subscriber.subscribe(`__keyevent@${ttlDB}__:expired`, async (message: string) => { - let wasUsed; - const redisResponse = (await geocodingRedis.get(message)) as string; - if (redisResponse) { - const geocodingResponse = JSON.parse(redisResponse) as GeocodingResponse; - wasUsed = geocodingResponse.wasUsed; - } - if (!(wasUsed ?? false)) { - await sendNoChosenResult(message, logger, config, kafkaProducer, geocodingRedis); - } - await geocodingRedis.del(message); - }); - return subscriber; - } catch (error) { - logger.error({ msg: `Redis Subscriber Error: ${(error as Error).message}` }); - throw error; - } + await subscriber.subscribe(`__keyevent@${ttlDB}__:expired`, async (message: string) => { + let wasUsed; + const redisResponse = (await geocodingRedis.get(message)) as string; + if (redisResponse) { + const geocodingResponse = JSON.parse(redisResponse) as GeocodingResponse; + wasUsed = geocodingResponse.wasUsed; + } + if (!(wasUsed ?? false)) { + await sendNoChosenResult(message, logger, config, kafkaProducer, geocodingRedis); + } + await geocodingRedis.del(message); + }); + return subscriber; }; export const sendNoChosenResult = async (