diff --git a/README.md b/README.md index 05fd0e5..5ccbce8 100644 --- a/README.md +++ b/README.md @@ -6,12 +6,12 @@ Geocoding's feedback API collects usage data from the [Geocoding API](https://gi Checkout the OpenAPI spec [here](/openapi3.yaml) ## Workflow -![workflow img](https://github.com/user-attachments/assets/7ed767ad-02dd-46f3-9de7-d2be72205e2c) +![workflow img](https://github.com/user-attachments/assets/63b8c7ed-4509-4dea-87ae-dc0cfc625ff9) ## How does it work? -Once a Geocoding user searches for something using Geocoding's api, it enters Redis (in the geocoding DB index), and an event is triggered. This event adds the `requestId` (from geocoding) to a different Redis DB with a ttl of 5 minutes (TTL DB index).

+Once a Geocoding user searches for something using Geocoding's api, it enters Redis, and an event is triggered. This event adds to the `requestId` (from geocoding) a prefix, and adds it also to Redis with a ttl.

When the Geocoding user chooses a result, the requesting system will then send the `request_id`, the `chosen_response_id`, and the `user_id` back to us using the [Feedback api](/openapi3.yaml).
-Once we get the chosen response from the api, we validate it and make sure it exists in the Redis geocoding DB, and once it is validated we add a `wasUsed = true` parameter to the geocoding response (in Redis) for later use. We then send the response to Kafka (where it will later be enriched and added to elastic -> see [Geocoding Enrichment](https://github.com/MapColonies/geocoding-enrichment) for more details).
+Once we get the chosen response from the api, we validate it and make sure it exists in the Redis, and once it is validated we add a `wasUsed = true` parameter to the geocoding response (in Redis) for later use. We then send the response to Kafka (where it will later be enriched and added to elastic -> see [Geocoding Enrichment](https://github.com/MapColonies/geocoding-enrichment) for more details).
Once the TTL of the `requestId` expires an even is triggered, and there are two options:
1. One or more responses were chosen. 2. No response was chosen (only searched for). diff --git a/config/custom-environment-variables.json b/config/custom-environment-variables.json index 699b4cc..a1873f5 100644 --- a/config/custom-environment-variables.json +++ b/config/custom-environment-variables.json @@ -56,10 +56,6 @@ "key": "REDIS_KEY_PATH", "cert": "REDIS_CERT_PATH" }, - "databases": { - "geocodingIndex": "REDIS_GEOCODING_INDEX", - "ttlIndex": "REDIS_TTL_INDEX" - }, "ttl": { "__name": "REDIS_TTL", "__format": "number" diff --git a/config/default.json b/config/default.json index ea07a3d..893588d 100644 --- a/config/default.json +++ b/config/default.json @@ -36,10 +36,6 @@ "key": "", "cert": "" }, - "databases": { - "geocodingIndex": 0, - "ttlIndex": 1 - }, "ttl": 300 }, "kafka": { diff --git a/config/test.json b/config/test.json index ba0e198..e3ef1e4 100644 --- a/config/test.json +++ b/config/test.json @@ -10,14 +10,10 @@ "key": "", "cert": "" }, - "databases": { - "geocodingIndex": 2, - "ttlIndex": 3 - }, "ttl": 2 }, "kafka": { - "brokers": "kafka:9092" + "brokers": "localhost:9092" }, "kafkaConsumer": {}, "kafkaProducer": {}, diff --git a/helm/templates/configmap.yaml b/helm/templates/configmap.yaml index d1328d0..a2af10a 100644 --- a/helm/templates/configmap.yaml +++ b/helm/templates/configmap.yaml @@ -21,7 +21,6 @@ data: {{- with .Values.redisConfig }} REDIS_HOST: {{ .host }} - REDIS_DATABASE: {{ .database | quote}} REDIS_PORT: {{ .port | quote }} {{- if .sslAuth.enabled }} REDIS_ENABLE_SSL_AUTH: "true" diff --git a/helm/values.yaml b/helm/values.yaml index 2cdf1a3..483e480 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -75,7 +75,6 @@ redisConfig: host: localhost username: "" password: "" - database: 0 port: 6379 sslAuth: enabled: false diff --git a/src/common/constants.ts b/src/common/constants.ts index 96c5961..be3ab0b 100644 --- a/src/common/constants.ts +++ b/src/common/constants.ts @@ -12,8 +12,7 @@ export const SERVICES = { CONFIG: Symbol('Config'), TRACER: Symbol('Tracer'), METER: Symbol('Meter'), - GEOCODING_REDIS: Symbol('GeocodingRedis'), - TTL_REDIS: Symbol('TTLRedis'), + REDIS: Symbol('Redis'), KAFKA: Symbol('Kafka'), } satisfies Record; /* eslint-enable @typescript-eslint/naming-convention */ diff --git a/src/common/interfaces.ts b/src/common/interfaces.ts index 3f22dfe..100b122 100644 --- a/src/common/interfaces.ts +++ b/src/common/interfaces.ts @@ -18,7 +18,6 @@ export type RedisConfig = { port: number; enableSslAuth: boolean; sslPaths: { ca: string; cert: string; key: string }; - databases: { geocodingIndex: number; ttlIndex: number }; } & RedisClientOptions; export type KafkaOptions = { diff --git a/src/common/utils.ts b/src/common/utils.ts index d7ad0bd..9af5d25 100644 --- a/src/common/utils.ts +++ b/src/common/utils.ts @@ -1,52 +1,14 @@ -import { DependencyContainer } from 'tsyringe'; -import { Logger } from '@map-colonies/js-logger'; -import { HealthCheck } from '@godaddy/terminus'; -import { RedisClient } from '../redis/index'; -import { SERVICES } from './constants'; - -export const healthCheckFactory = (container: DependencyContainer): HealthCheck => { - return async (): Promise => { - const logger = container.resolve(SERVICES.LOGGER); - const geocodingRedis = container.resolve(SERVICES.GEOCODING_REDIS); - const ttlRedis = container.resolve(SERVICES.TTL_REDIS); - - const promises: Promise[] = []; - - promises.push( - new Promise((resolve, reject) => { - geocodingRedis - .ping() - .then(() => { - resolve('Healthcheck passed for GeocodingRedis connection'); - }) - .catch((error: Error) => { - reject({ - msg: `Healthcheck failed for GeocodingRedis.`, - error, - }); - }); - }) - ); - promises.push( - new Promise((resolve, reject) => { - ttlRedis - .ping() - .then(() => { - resolve('Healthcheck passed for TTLRedis connection'); - }) - .catch((error: Error) => { - reject({ - msg: `Healthcheck failed for TTLRedis.`, - error, - }); - }); - }) - ); - - await Promise.allSettled(promises).then((results) => { - results.forEach((msg) => logger.debug({ msg })); - }); - - return Promise.resolve(); - }; +import { TimeoutError } from './errors'; + +export const promiseTimeout = async (ms: number, promise: Promise): Promise => { + // create a promise that rejects in milliseconds + const timeout = new Promise((_, reject) => { + const id = setTimeout(() => { + clearTimeout(id); + reject(new TimeoutError(`Timed out in + ${ms} + ms.`)); + }, ms); + }); + + // returns a race between our timeout and the passed in promise + return Promise.race([promise, timeout]); }; diff --git a/src/containerConfig.ts b/src/containerConfig.ts index b99a215..a6f05fd 100644 --- a/src/containerConfig.ts +++ b/src/containerConfig.ts @@ -12,10 +12,9 @@ import { CLEANUP_REGISTRY, HEALTHCHECK, ON_SIGNAL, REDIS_CLIENT_FACTORY, REDIS_S import { tracing } from './common/tracing'; import { feedbackRouterFactory, FEEDBACK_ROUTER_SYMBOL } from './feedback/routes/feedbackRouter'; import { InjectionObject, registerDependencies } from './common/dependencyRegistration'; -import { RedisClientFactory } from './redis'; +import { healthCheckFunctionFactory, RedisClient, RedisClientFactory } from './redis'; import { kafkaClientFactory } from './kafka'; import { redisSubscribe } from './redis/subscribe'; -import { healthCheckFactory } from './common/utils'; export interface RegisterOptions { override?: InjectionObject[]; @@ -80,8 +79,8 @@ export const registerExternalValues = async (options?: RegisterOptions): Promise postInjectionHook: async (deps: DependencyContainer): Promise => { const redisFactory = deps.resolve(REDIS_CLIENT_FACTORY); - for (const redisIndex of [SERVICES.GEOCODING_REDIS, SERVICES.TTL_REDIS, REDIS_SUB]) { - const redis = redisFactory.createRedisClient(redisIndex); + for (const redisIndex of [SERVICES.REDIS, REDIS_SUB]) { + const redis = redisFactory.createRedisClient(); deps.register(redisIndex, { useValue: redis }); cleanupRegistry.register({ func: async (): Promise => { @@ -105,8 +104,9 @@ export const registerExternalValues = async (options?: RegisterOptions): Promise { token: HEALTHCHECK, provider: { - useFactory: (depContainer): HealthCheck => { - return healthCheckFactory(depContainer); + useFactory: (container): HealthCheck => { + const redis = container.resolve(SERVICES.REDIS); + return healthCheckFunctionFactory(redis); }, }, }, diff --git a/src/feedback/models/feedbackManager.ts b/src/feedback/models/feedbackManager.ts index 7fae20e..0847f7d 100644 --- a/src/feedback/models/feedbackManager.ts +++ b/src/feedback/models/feedbackManager.ts @@ -11,7 +11,7 @@ import { IFeedbackModel } from './feedback'; export class FeedbackManager { public constructor( @inject(SERVICES.LOGGER) private readonly logger: Logger, - @inject(SERVICES.GEOCODING_REDIS) private readonly geocodingRedis: RedisClient, + @inject(SERVICES.REDIS) private readonly redisClient: RedisClient, @inject(SERVICES.KAFKA) private readonly kafkaProducer: Producer, @inject(SERVICES.CONFIG) private readonly config: IConfig ) {} @@ -32,7 +32,7 @@ export class FeedbackManager { responseTime: new Date(), geocodingResponse: await this.getGeocodingResponse(requestId, userId, apiKey), }; - await this.geocodingRedis.set(requestId, JSON.stringify(feedbackResponse.geocodingResponse)); + await this.redisClient.set(requestId, JSON.stringify(feedbackResponse.geocodingResponse)); this.logger.info({ msg: 'creating feedback', requestId }); await this.send(feedbackResponse); @@ -41,7 +41,7 @@ export class FeedbackManager { public async getGeocodingResponse(requestId: string, userId: string, apiKey: string): Promise { try { - const redisResponse = (await this.geocodingRedis.get(requestId)) as string; + const redisResponse = (await this.redisClient.get(requestId)) as string; if (redisResponse) { const geocodingResponse = JSON.parse(redisResponse) as GeocodingResponse; geocodingResponse.userId = userId; diff --git a/src/redis/index.ts b/src/redis/index.ts index ace9939..3998003 100644 --- a/src/redis/index.ts +++ b/src/redis/index.ts @@ -1,16 +1,18 @@ import { readFileSync } from 'fs'; import { inject, injectable } from 'tsyringe'; import { Logger } from '@map-colonies/js-logger'; +import { HealthCheck } from '@godaddy/terminus'; import { createClient, RedisClientOptions } from 'redis'; import { SERVICES } from '../common/constants'; import { RedisConfig, IConfig } from '../common/interfaces'; +import { promiseTimeout } from '../common/utils'; @injectable() export class RedisClientFactory { public constructor(@inject(SERVICES.LOGGER) private readonly logger: Logger, @inject(SERVICES.CONFIG) private readonly config: IConfig) {} - public createConnectionOptions(redisConfig: RedisConfig, isGeocodingRedis: boolean): Partial { - const { host, port, enableSslAuth, sslPaths, databases, ...clientOptions } = redisConfig; + public createConnectionOptions(redisConfig: RedisConfig): Partial { + const { host, port, enableSslAuth, sslPaths, ...clientOptions } = redisConfig; clientOptions.socket = { host, port }; if (enableSslAuth) { clientOptions.socket = { @@ -21,19 +23,12 @@ export class RedisClientFactory { ca: sslPaths.ca !== '' ? readFileSync(sslPaths.ca) : undefined, }; } - if (isGeocodingRedis) { - clientOptions.database = databases.geocodingIndex; - } else { - clientOptions.database = databases.ttlIndex; - } return clientOptions; } - public createRedisClient(redisIndex: symbol): RedisClient { + public createRedisClient(): RedisClient { const dbConfig = this.config.get('redis'); - - const isGeocodingRedis: boolean = redisIndex === SERVICES.GEOCODING_REDIS; - const connectionOptions = this.createConnectionOptions(dbConfig, isGeocodingRedis); + const connectionOptions = this.createConnectionOptions(dbConfig); const redisClient = createClient(connectionOptions) .on('error', (error: Error) => this.logger.error({ msg: 'redis client errored', err: error })) @@ -47,3 +42,14 @@ export class RedisClientFactory { } export type RedisClient = ReturnType; + +export const CONNECTION_TIMEOUT = 5000; + +export const healthCheckFunctionFactory = (redis: RedisClient): HealthCheck => { + return async (): Promise => { + const check = redis.ping().then(() => { + return; + }); + return promiseTimeout(CONNECTION_TIMEOUT, check); + }; +}; diff --git a/src/redis/subscribe.ts b/src/redis/subscribe.ts index fc72cf6..ebdd5f2 100644 --- a/src/redis/subscribe.ts +++ b/src/redis/subscribe.ts @@ -6,6 +6,8 @@ import { IConfig, FeedbackResponse, GeocodingResponse } from '../common/interfac import { NotFoundError } from '../common/errors'; import { RedisClient } from '../redis/index'; +const TTL_PREFIX = 'ttl_'; + export const send = async (message: FeedbackResponse, logger: Logger, config: IConfig, kafkaProducer: Producer): Promise => { const topic = config.get('outputTopic'); logger.info(`Kafka send message. Topic: ${topic}`); @@ -22,35 +24,39 @@ export const send = async (message: FeedbackResponse, logger: Logger, config: IC }; export const redisSubscribe = async (deps: DependencyContainer): Promise => { - const geocodingRedis = deps.resolve(SERVICES.GEOCODING_REDIS); - const ttlRedis = deps.resolve(SERVICES.TTL_REDIS); + const redisClient = deps.resolve(SERVICES.REDIS); const config = deps.resolve(SERVICES.CONFIG); const kafkaProducer = deps.resolve(SERVICES.KAFKA); const logger = deps.resolve(SERVICES.LOGGER); const subscriber = deps.resolve(REDIS_SUB); logger.debug('Redis subscriber init'); - const ttlDB = config.get('redis.databases.ttlIndex'); - const geocodingDB = config.get('redis.databases.geocodingIndex'); const redisTTL = config.get('redis.ttl'); - await subscriber.subscribe(`__keyevent@${geocodingDB}__:set`, async (message) => { - logger.info(`Redis: Got new request ${message}`); - // eslint-disable-next-line @typescript-eslint/naming-convention - await ttlRedis.set(message, '', { EX: redisTTL }); + await subscriber.subscribe(`__keyevent@0__:set`, async (message) => { + if (!message.startsWith(TTL_PREFIX)) { + logger.info(`Redis: Got new request ${message}`); + const ttlMessage = TTL_PREFIX + message; + // eslint-disable-next-line @typescript-eslint/naming-convention + await redisClient.set(ttlMessage, '', { EX: redisTTL }); + } }); - await subscriber.subscribe(`__keyevent@${ttlDB}__:expired`, async (message: string) => { - let wasUsed; - const redisResponse = (await geocodingRedis.get(message)) as string; - if (redisResponse) { - const geocodingResponse = JSON.parse(redisResponse) as GeocodingResponse; - wasUsed = geocodingResponse.wasUsed; - } - if (!(wasUsed ?? false)) { - await sendNoChosenResult(message, logger, config, kafkaProducer, geocodingRedis); + await subscriber.subscribe(`__keyevent@0__:expired`, async (message: string) => { + if (message.startsWith(TTL_PREFIX)) { + const geocodingMessage = message.substring(TTL_PREFIX.length); + + let wasUsed; + const redisResponse = (await redisClient.get(geocodingMessage)) as string; + if (redisResponse) { + const geocodingResponse = JSON.parse(redisResponse) as GeocodingResponse; + wasUsed = geocodingResponse.wasUsed; + } + if (!(wasUsed ?? false)) { + await sendNoChosenResult(geocodingMessage, logger, config, kafkaProducer, redisClient); + } + await redisClient.del(geocodingMessage); } - await geocodingRedis.del(message); }); return subscriber; }; @@ -60,21 +66,21 @@ export const sendNoChosenResult = async ( logger: Logger, config: IConfig, kafkaProducer: Producer, - geocodingRedis: RedisClient + redisClient: RedisClient ): Promise => { const feedbackResponse: FeedbackResponse = { requestId, chosenResultId: null, userId: '', responseTime: new Date(), - geocodingResponse: await getNoChosenGeocodingResponse(requestId, logger, geocodingRedis), + geocodingResponse: await getNoChosenGeocodingResponse(requestId, logger, redisClient), }; await send(feedbackResponse, logger, config, kafkaProducer); }; -export const getNoChosenGeocodingResponse = async (requestId: string, logger: Logger, geocodingRedis: RedisClient): Promise => { +export const getNoChosenGeocodingResponse = async (requestId: string, logger: Logger, redisClient: RedisClient): Promise => { try { - const redisResponse = await geocodingRedis.get(requestId); + const redisResponse = await redisClient.get(requestId); if (redisResponse != null) { const geocodingResponse = JSON.parse(redisResponse) as GeocodingResponse; return geocodingResponse; diff --git a/tests/integration/docs/docs.spec.ts b/tests/integration/docs/docs.spec.ts index 8239d5a..5761f27 100644 --- a/tests/integration/docs/docs.spec.ts +++ b/tests/integration/docs/docs.spec.ts @@ -16,8 +16,7 @@ describe('docs', function () { override: [ { token: SERVICES.LOGGER, provider: { useValue: jsLogger({ enabled: false }) } }, { token: SERVICES.TRACER, provider: { useValue: trace.getTracer('testTracer') } }, - { token: SERVICES.GEOCODING_REDIS, provider: { useValue: {} } }, - { token: SERVICES.TTL_REDIS, provider: { useValue: {} } }, + { token: SERVICES.REDIS, provider: { useValue: {} } }, { token: REDIS_SUB, provider: { useValue: {} } }, { token: SERVICES.KAFKA, provider: { useValue: {} } }, ], diff --git a/tests/integration/feedback/feedback.spec.ts b/tests/integration/feedback/feedback.spec.ts index 8b43b1e..4d5d396 100644 --- a/tests/integration/feedback/feedback.spec.ts +++ b/tests/integration/feedback/feedback.spec.ts @@ -23,7 +23,7 @@ const mockKafkaProducer = { describe('feedback', function () { let requestSender: FeedbackRequestSender; - let geocodingRedis: RedisClient; + let redisClient: RedisClient; let depContainer: DependencyContainer; beforeAll(async function () { @@ -36,7 +36,7 @@ describe('feedback', function () { useChild: true, }); requestSender = new FeedbackRequestSender(app); - geocodingRedis = container.resolve(SERVICES.GEOCODING_REDIS); + redisClient = container.resolve(SERVICES.REDIS); depContainer = container; }); @@ -56,7 +56,7 @@ describe('feedback', function () { respondedAt: new Date('2024-08-29T14:39:10.602Z'), }; const redisKey = crypto.randomUUID(); - await geocodingRedis.set(redisKey, JSON.stringify(geocodingResponse)); + await redisClient.set(redisKey, JSON.stringify(geocodingResponse)); const feedbackModel: IFeedbackModel = { request_id: redisKey, @@ -77,12 +77,12 @@ describe('feedback', function () { }; const redisKey = crypto.randomUUID(); - await geocodingRedis.set(redisKey, JSON.stringify(geocodingResponse)); - expect(await geocodingRedis.exists(redisKey)).toBe(1); + await redisClient.set(redisKey, JSON.stringify(geocodingResponse)); + expect(await redisClient.exists(redisKey)).toBe(1); // eslint-disable-next-line @typescript-eslint/no-misused-promises setTimeout(async () => { - expect(await geocodingRedis.exists(redisKey)).toBe(0); + expect(await redisClient.exists(redisKey)).toBe(0); }, 3000); }); @@ -96,7 +96,7 @@ describe('feedback', function () { response: JSON.parse('["USA"]') as JSON, respondedAt: new Date(), }; - await geocodingRedis.set(requestId, JSON.stringify(geocodingResponse)); + await redisClient.set(requestId, JSON.stringify(geocodingResponse)); await new Promise((resolve) => setTimeout(resolve, 3000)); @@ -146,7 +146,7 @@ describe('feedback', function () { const requestId = crypto.randomUUID(); - depContainer.register(SERVICES.GEOCODING_REDIS, { useValue: mockRedis }); + depContainer.register(SERVICES.REDIS, { useValue: mockRedis }); depContainer.register(SERVICES.LOGGER, { useValue: mockLogger }); (mockRedis.get as jest.Mock).mockRejectedValue(new Error('Redis get failed')); @@ -213,7 +213,7 @@ describe('feedback', function () { info: jest.fn(), } as unknown as jest.Mocked; - depContainer.register(SERVICES.GEOCODING_REDIS, { useValue: mockRedis }); + depContainer.register(SERVICES.REDIS, { useValue: mockRedis }); const requestId = crypto.randomUUID(); (mockRedis.get as jest.Mock).mockResolvedValue(null);