diff --git a/.github/workflows/build-and-push.yaml b/.github/workflows/build-and-push.yaml
index 3b57d09..8f4c0bf 100644
--- a/.github/workflows/build-and-push.yaml
+++ b/.github/workflows/build-and-push.yaml
@@ -1,32 +1,23 @@
-name: Publish Release
+name: Build and push artifacts
on:
push:
tags:
- - 'v*'
+ - "v*"
-jobs:
-
- build_and_test:
- runs-on: ubuntu-latest
+ workflow_dispatch:
+ inputs:
+ version:
+ required: true
+ type: string
+env:
+ HELM_EXPERIMENTAL_OCI: 1
- steps:
- - name: Azure Pipelines Action
- uses: Azure/pipelines@v1
- with:
- azure-devops-project-url: https://dev.azure.com/Libot-Mipui-Org/osm-sync-tracker
- azure-pipeline-name: 'build-and-push-to-ACR'
- azure-devops-token: ${{ secrets.AZURE_DEVOPS_TOKEN }}
+permissions:
+ contents: write
+ pull-requests: write
- publish_release:
- runs-on: ubuntu-latest
- steps:
- - name: Checkout code for CHANGELOG.md
- uses: actions/checkout@v2
-
- - name: Publish Release to Github
- uses: softprops/action-gh-release@v1
- with:
- body_path: CHANGELOG.md
- env:
- GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+jobs:
+ build_and_push_docker:
+ uses: MapColonies/shared-workflows/.github/workflows/build-and-push-docker.yaml@v2
+ secrets: inherit
diff --git a/.github/workflows/pull_request.yaml b/.github/workflows/pull_request.yaml
index 562bce8..60d48e7 100644
--- a/.github/workflows/pull_request.yaml
+++ b/.github/workflows/pull_request.yaml
@@ -23,7 +23,6 @@ jobs:
uses: wearerequired/lint-action@v1
with:
github_token: ${{ secrets.github_token }}
- # Enable linters
eslint: true
prettier: true
eslint_extensions: ts
@@ -38,46 +37,17 @@ jobs:
tests:
name: Run Tests
runs-on: ubuntu-latest
- container: node:16
services:
- # Label used to access the service container
redis:
- # Docker Hub image
- image: bitnami/redis:6.2.7
- # Provide the password for postgres
+ image: redis:7.2.3
env:
- REDIS_ENABLE_SSL_AUTH: false
- REDIS_DATABASE: 1
ALLOW_EMPTY_PASSWORD: yes
ports:
- 6379:6379
- # Set health checks to wait until postgres has started
options: >-
- --health-cmd "redis-cli ping || exit 1"
- --health-interval 10s
- --health-timeout 5s
- --health-retries 5
-
- zookeeper:
- image: bitnami/zookeeper:3.8.0
- ports:
- - 2181:2181
- env:
- ALLOW_ANONYMOUS_LOGIN: yes
-
- kafka:
- image: bitnami/kafka:3.8.0
- ports:
- - 9092:9092
- env:
- KAFKA_BROKER_ID: 1
- KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
- KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
- ALLOW_PLAINTEXT_LISTENER: yes
- options: >-
- --health-cmd "kafka-topics.sh --bootstrap-server localhost:9092 --list || exit 1"
+ --health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
@@ -98,22 +68,10 @@ jobs:
- name: Install Node.js dependencies
run: npm ci
- - name: Install Docker
- run: |
- apt-get update
- apt-get install -y docker.io
-
- - name: Wait for Kafka
- run: |
- until docker exec $(docker ps -qf "name=kafka") kafka-topics.sh --list --bootstrap-server localhost:9092; do
- echo "Waiting for Kafka to be ready...";
- sleep 5;
- done
-
- - name: Create Kafka testTopic
+ - name: Create Redis PubSub
run: |
- docker exec $(docker ps -qf "name=kafka") \
- kafka-topics.sh --create --topic testTopic --bootstrap-server localhost:9092
+ docker exec $(docker ps -qf "name=redis") \
+ redis-cli config set notify-keyspace-events KEA
- name: Run tests
run: npm run test
@@ -142,4 +100,4 @@ jobs:
uses: actions/checkout@v2
- name: build Docker image
- run: docker build -t test-build:latest .
+ run: docker build -t test-build:latest .
\ No newline at end of file
diff --git a/README.md b/README.md
index f24be1b..05fd0e5 100644
--- a/README.md
+++ b/README.md
@@ -1,8 +1,25 @@
# Feedback-Api
This is an API that connects to MapColonies geocoding service, and get's feedback from the users.
+Geocoding's feedback API collects usage data from the [Geocoding API](https://github.com/MapColonies/Geocoding) user's response and stores it for BI purposes. We use this data to better understand and measure the relevance of our responses and adjust the data and algorithm accordingly.
+
## API
Checkout the OpenAPI spec [here](/openapi3.yaml)
+## Workflow
+
+
+## How does it work?
+Once a Geocoding user searches for something using Geocoding's api, it enters Redis (in the geocoding DB index), and an event is triggered. This event adds the `requestId` (from geocoding) to a different Redis DB with a ttl of 5 minutes (TTL DB index).
+When the Geocoding user chooses a result, the requesting system will then send the `request_id`, the `chosen_response_id`, and the `user_id` back to us using the [Feedback api](/openapi3.yaml).
+Once we get the chosen response from the api, we validate it and make sure it exists in the Redis geocoding DB, and once it is validated we add a `wasUsed = true` parameter to the geocoding response (in Redis) for later use. We then send the response to Kafka (where it will later be enriched and added to elastic -> see [Geocoding Enrichment](https://github.com/MapColonies/geocoding-enrichment) for more details).
+Once the TTL of the `requestId` expires an even is triggered, and there are two options:
+1. One or more responses were chosen.
+2. No response was chosen (only searched for).
+
+We now go back to the `wasUsed` parameter from earlier, and check to see its value (or even if it exists).
+- If `wasUsed = true` we know from earlier that this response was already chosen and sent to Kafka previously, therefore we can remove it from the Redis geocoding DB index.
+- If `wasUsed` equals to one of the following: `false`, `null` or `undefined` then we create a new feedback response and make the `chosenResultId` `null`, and then send this response to Kafka, and delete the request from the Redis geocoding DB index.
+
## Installation
Install deps with npm
diff --git a/config/custom-environment-variables.json b/config/custom-environment-variables.json
index 12ec271..699b4cc 100644
--- a/config/custom-environment-variables.json
+++ b/config/custom-environment-variables.json
@@ -56,8 +56,12 @@
"key": "REDIS_KEY_PATH",
"cert": "REDIS_CERT_PATH"
},
- "database": {
- "__name": "REDIS_DATABASE",
+ "databases": {
+ "geocodingIndex": "REDIS_GEOCODING_INDEX",
+ "ttlIndex": "REDIS_TTL_INDEX"
+ },
+ "ttl": {
+ "__name": "REDIS_TTL",
"__format": "number"
}
},
diff --git a/config/default.json b/config/default.json
index 8108e8b..ea07a3d 100644
--- a/config/default.json
+++ b/config/default.json
@@ -36,7 +36,11 @@
"key": "",
"cert": ""
},
- "database": 0
+ "databases": {
+ "geocodingIndex": 0,
+ "ttlIndex": 1
+ },
+ "ttl": 300
},
"kafka": {
"brokers": "KAFKA_URL:9092",
diff --git a/config/test.json b/config/test.json
index 8e7ddff..ba0e198 100644
--- a/config/test.json
+++ b/config/test.json
@@ -1,6 +1,6 @@
{
"redis": {
- "host": "redis",
+ "host": "localhost",
"port": 6379,
"username": "",
"password": "",
@@ -10,7 +10,11 @@
"key": "",
"cert": ""
},
- "database": 1
+ "databases": {
+ "geocodingIndex": 2,
+ "ttlIndex": 3
+ },
+ "ttl": 2
},
"kafka": {
"brokers": "kafka:9092"
diff --git a/package.json b/package.json
index 8f9aa63..1cf1865 100644
--- a/package.json
+++ b/package.json
@@ -5,7 +5,7 @@
"main": "./src/index.ts",
"scripts": {
"test:unit": "jest --config=./tests/configurations/unit/jest.config.js",
- "test:integration": "jest --config=./tests/configurations/integration/jest.config.js --forceExit",
+ "test:integration": "jest --config=./tests/configurations/integration/jest.config.js",
"format": "prettier --check .",
"format:fix": "prettier --write .",
"prelint:fix": "npm run format:fix",
diff --git a/src/common/constants.ts b/src/common/constants.ts
index 64d90fa..96c5961 100644
--- a/src/common/constants.ts
+++ b/src/common/constants.ts
@@ -7,16 +7,19 @@ export const IGNORED_OUTGOING_TRACE_ROUTES = [/^.*\/v1\/metrics.*$/];
export const IGNORED_INCOMING_TRACE_ROUTES = [/^.*\/docs.*$/];
/* eslint-disable @typescript-eslint/naming-convention */
-export const SERVICES: Record = {
+export const SERVICES = {
LOGGER: Symbol('Logger'),
CONFIG: Symbol('Config'),
TRACER: Symbol('Tracer'),
METER: Symbol('Meter'),
- REDIS: Symbol('Redis'),
+ GEOCODING_REDIS: Symbol('GeocodingRedis'),
+ TTL_REDIS: Symbol('TTLRedis'),
KAFKA: Symbol('Kafka'),
-};
+} satisfies Record;
/* eslint-enable @typescript-eslint/naming-convention */
export const ON_SIGNAL = Symbol('onSignal');
export const HEALTHCHECK = Symbol('healthcheck');
export const CLEANUP_REGISTRY = Symbol('cleanupRegistry');
+export const REDIS_SUB = Symbol('redisSubClient');
+export const REDIS_CLIENT_FACTORY = Symbol('redisClientFactory');
diff --git a/src/common/dependencyRegistration.ts b/src/common/dependencyRegistration.ts
index e90304a..cbf8803 100644
--- a/src/common/dependencyRegistration.ts
+++ b/src/common/dependencyRegistration.ts
@@ -8,6 +8,7 @@ export interface InjectionObject {
provider: Providers;
options?: RegistrationOptions;
postInjectionHook?: (container: DependencyContainer) => Promise;
+ afterAllInjectionHook?: (container: DependencyContainer) => void | Promise;
}
export const registerDependencies = async (
@@ -23,5 +24,8 @@ export const registerDependencies = async (
await injectionObj.postInjectionHook?.(container);
}
+ for (const dep of dependencies) {
+ await dep.afterAllInjectionHook?.(container);
+ }
return container;
};
diff --git a/src/common/interfaces.ts b/src/common/interfaces.ts
index 3945fd9..3f22dfe 100644
--- a/src/common/interfaces.ts
+++ b/src/common/interfaces.ts
@@ -18,6 +18,7 @@ export type RedisConfig = {
port: number;
enableSslAuth: boolean;
sslPaths: { ca: string; cert: string; key: string };
+ databases: { geocodingIndex: number; ttlIndex: number };
} & RedisClientOptions;
export type KafkaOptions = {
@@ -28,16 +29,17 @@ export type KafkaOptions = {
export interface FeedbackResponse {
requestId: string;
- chosenResultId: number;
+ chosenResultId: number | null;
userId: string;
responseTime: Date; // from FeedbackApi
geocodingResponse: GeocodingResponse;
}
export interface GeocodingResponse {
- userId: string;
+ userId?: string;
apiKey: string;
site: string;
response: JSON;
respondedAt: Date; // from Geocoding
+ wasUsed?: boolean;
}
diff --git a/src/common/utils.ts b/src/common/utils.ts
index 9af5d25..f592234 100644
--- a/src/common/utils.ts
+++ b/src/common/utils.ts
@@ -1,14 +1,34 @@
-import { TimeoutError } from './errors';
+import { DependencyContainer, FactoryFunction } from 'tsyringe';
+import { Logger } from '@map-colonies/js-logger';
+import { RedisClient } from '../redis/index';
+import { SERVICES } from './constants';
-export const promiseTimeout = async (ms: number, promise: Promise): Promise => {
- // create a promise that rejects in milliseconds
- const timeout = new Promise((_, reject) => {
- const id = setTimeout(() => {
- clearTimeout(id);
- reject(new TimeoutError(`Timed out in + ${ms} + ms.`));
- }, ms);
- });
+export const healthCheckFactory: FactoryFunction = (container: DependencyContainer): void => {
+ const logger = container.resolve(SERVICES.LOGGER);
+ const geocodingRedis = container.resolve(SERVICES.GEOCODING_REDIS);
+ const ttlRedis = container.resolve(SERVICES.TTL_REDIS);
- // returns a race between our timeout and the passed in promise
- return Promise.race([promise, timeout]);
+ geocodingRedis
+ .ping()
+ .then(() => {
+ return;
+ })
+ .catch((error: Error) => {
+ logger.error({
+ message: `Healthcheck failed for GeocodingRedis.`,
+ error,
+ });
+ });
+
+ ttlRedis
+ .ping()
+ .then(() => {
+ return;
+ })
+ .catch((error: Error) => {
+ logger.error({
+ message: `Healthcheck failed for GeocodingRedis.`,
+ error,
+ });
+ });
};
diff --git a/src/containerConfig.ts b/src/containerConfig.ts
index df951e1..675945e 100644
--- a/src/containerConfig.ts
+++ b/src/containerConfig.ts
@@ -7,13 +7,15 @@ import jsLogger, { LoggerOptions } from '@map-colonies/js-logger';
import { CleanupRegistry } from '@map-colonies/cleanup-registry';
import { Metrics } from '@map-colonies/telemetry';
import { instancePerContainerCachingFactory } from 'tsyringe';
-import { HealthCheck } from '@godaddy/terminus';
-import { CLEANUP_REGISTRY, HEALTHCHECK, ON_SIGNAL, SERVICES, SERVICE_NAME } from './common/constants';
+import { createClient } from 'redis';
+import { CLEANUP_REGISTRY, HEALTHCHECK, ON_SIGNAL, REDIS_CLIENT_FACTORY, REDIS_SUB, SERVICES, SERVICE_NAME } from './common/constants';
import { tracing } from './common/tracing';
import { feedbackRouterFactory, FEEDBACK_ROUTER_SYMBOL } from './feedback/routes/feedbackRouter';
import { InjectionObject, registerDependencies } from './common/dependencyRegistration';
-import { healthCheckFunctionFactory, RedisClient, redisClientFactory } from './redis';
+import { RedisClient, RedisClientFactory } from './redis';
import { kafkaClientFactory } from './kafka';
+import { redisSubscribe } from './redis/subscribe';
+import { healthCheckFactory } from './common/utils';
export interface RegisterOptions {
override?: InjectionObject[];
@@ -44,22 +46,26 @@ export const registerExternalValues = async (options?: RegisterOptions): Promise
{
token: CLEANUP_REGISTRY,
provider: { useValue: cleanupRegistry },
- },
- {
- token: SERVICES.REDIS,
- provider: { useFactory: instancePerContainerCachingFactory(redisClientFactory) },
- postInjectionHook: async (deps: DependencyContainer): Promise => {
- const redis = deps.resolve(SERVICES.REDIS);
- cleanupRegistry.register({ func: redis.disconnect.bind(redis), id: SERVICES.REDIS });
- await redis.connect();
+ afterAllInjectionHook(): void {
+ const cleanupRegistryLogger = logger.child({ subComponent: 'cleanupRegistry' });
+
+ cleanupRegistry.on('itemFailed', (id, error, msg) => cleanupRegistryLogger.error({ msg, itemId: id, err: error }));
+ cleanupRegistry.on('itemCompleted', (id) => cleanupRegistryLogger.info({ itemId: id, msg: `cleanup finished for item ${id.toString()}` }));
+ cleanupRegistry.on('finished', (status) => cleanupRegistryLogger.info({ msg: `cleanup registry finished cleanup`, status }));
},
},
{
token: SERVICES.KAFKA,
- provider: { useFactory: kafkaClientFactory },
+ provider: { useFactory: instancePerContainerCachingFactory(kafkaClientFactory) },
postInjectionHook: async (deps: DependencyContainer): Promise => {
const kafkaProducer = deps.resolve(SERVICES.KAFKA);
- cleanupRegistry.register({ func: kafkaProducer.disconnect.bind(kafkaProducer), id: SERVICES.KAFKA });
+ cleanupRegistry.register({
+ func: async (): Promise => {
+ await kafkaProducer.disconnect();
+ return Promise.resolve();
+ },
+ id: SERVICES.KAFKA,
+ });
try {
await kafkaProducer.connect();
logger.info('Connected to Kafka');
@@ -69,21 +75,52 @@ export const registerExternalValues = async (options?: RegisterOptions): Promise
},
},
{
- token: HEALTHCHECK,
+ token: REDIS_CLIENT_FACTORY,
+ provider: { useClass: RedisClientFactory },
+ postInjectionHook: async (deps: DependencyContainer): Promise => {
+ const redisFactory = deps.resolve(REDIS_CLIENT_FACTORY);
+ for (const redisIndex of [SERVICES.GEOCODING_REDIS, SERVICES.TTL_REDIS]) {
+ const redis = redisFactory.createRedisClient(redisIndex);
+ deps.register(redisIndex, { useValue: redis });
+ cleanupRegistry.register({
+ func: async (): Promise => {
+ await redis.quit();
+ return Promise.resolve();
+ },
+ id: redisIndex,
+ });
+ await redis.connect();
+ logger.info(`Connected to ${redisIndex.toString()}`);
+ }
+ },
+ },
+ { token: HEALTHCHECK, provider: { useFactory: healthCheckFactory } },
+ {
+ token: REDIS_SUB,
provider: {
- useFactory: (container): HealthCheck => {
- const redis = container.resolve(SERVICES.REDIS);
- return healthCheckFunctionFactory(redis);
- },
+ useFactory: instancePerContainerCachingFactory((): RedisClient => {
+ const subscriber = createClient();
+ return subscriber;
+ }),
+ },
+ postInjectionHook: async (deps: DependencyContainer): Promise => {
+ const subscriber = deps.resolve(REDIS_SUB);
+ cleanupRegistry.register({
+ func: async () => {
+ await subscriber.quit();
+ return Promise.resolve();
+ },
+ id: REDIS_SUB,
+ });
+ await subscriber.connect();
+ logger.info('Connected to Redis Subscriber');
+ await redisSubscribe(deps);
},
},
{
token: ON_SIGNAL,
provider: {
- useFactory: instancePerContainerCachingFactory((container) => {
- const cleanupRegistry = container.resolve(CLEANUP_REGISTRY);
- return cleanupRegistry.trigger.bind(cleanupRegistry);
- }),
+ useValue: cleanupRegistry.trigger.bind(cleanupRegistry),
},
},
];
diff --git a/src/feedback/models/feedbackManager.ts b/src/feedback/models/feedbackManager.ts
index 569bf20..7fae20e 100644
--- a/src/feedback/models/feedbackManager.ts
+++ b/src/feedback/models/feedbackManager.ts
@@ -11,7 +11,7 @@ import { IFeedbackModel } from './feedback';
export class FeedbackManager {
public constructor(
@inject(SERVICES.LOGGER) private readonly logger: Logger,
- @inject(SERVICES.REDIS) private readonly redis: RedisClient,
+ @inject(SERVICES.GEOCODING_REDIS) private readonly geocodingRedis: RedisClient,
@inject(SERVICES.KAFKA) private readonly kafkaProducer: Producer,
@inject(SERVICES.CONFIG) private readonly config: IConfig
) {}
@@ -32,19 +32,21 @@ export class FeedbackManager {
responseTime: new Date(),
geocodingResponse: await this.getGeocodingResponse(requestId, userId, apiKey),
};
+ await this.geocodingRedis.set(requestId, JSON.stringify(feedbackResponse.geocodingResponse));
+
this.logger.info({ msg: 'creating feedback', requestId });
await this.send(feedbackResponse);
return feedbackResponse;
}
public async getGeocodingResponse(requestId: string, userId: string, apiKey: string): Promise {
- const redisClient = this.redis;
try {
- const redisResponse = (await redisClient.get(requestId)) as string;
+ const redisResponse = (await this.geocodingRedis.get(requestId)) as string;
if (redisResponse) {
const geocodingResponse = JSON.parse(redisResponse) as GeocodingResponse;
geocodingResponse.userId = userId;
geocodingResponse.apiKey = apiKey;
+ geocodingResponse.wasUsed = true;
return geocodingResponse;
}
} catch (error) {
@@ -56,16 +58,11 @@ export class FeedbackManager {
public async send(message: FeedbackResponse): Promise {
const topic = this.config.get('outputTopic');
- this.logger.info(`Kafka Send message. Topic: ${topic}`);
+ this.logger.info(`Kafka send message. Topic: ${topic}`);
try {
- await this.kafkaProducer.connect();
await this.kafkaProducer.send({
topic,
- messages: [
- {
- value: JSON.stringify(message),
- },
- ],
+ messages: [{ value: JSON.stringify(message) }],
});
this.logger.info(`Kafka message sent. Topic: ${topic}`);
} catch (error) {
diff --git a/src/redis/index.ts b/src/redis/index.ts
index abd4183..ace9939 100644
--- a/src/redis/index.ts
+++ b/src/redis/index.ts
@@ -1,52 +1,49 @@
import { readFileSync } from 'fs';
+import { inject, injectable } from 'tsyringe';
import { Logger } from '@map-colonies/js-logger';
-import { HealthCheck } from '@godaddy/terminus';
import { createClient, RedisClientOptions } from 'redis';
-import { DependencyContainer, FactoryFunction } from 'tsyringe';
import { SERVICES } from '../common/constants';
import { RedisConfig, IConfig } from '../common/interfaces';
-import { promiseTimeout } from '../common/utils';
-const createConnectionOptions = (redisConfig: RedisConfig): Partial => {
- const { host, port, enableSslAuth, sslPaths, ...clientOptions } = redisConfig;
- clientOptions.socket = { host, port };
- if (enableSslAuth) {
- clientOptions.socket = {
- ...clientOptions.socket,
- tls: true,
- key: sslPaths.key !== '' ? readFileSync(sslPaths.key) : undefined,
- cert: sslPaths.cert !== '' ? readFileSync(sslPaths.cert) : undefined,
- ca: sslPaths.ca !== '' ? readFileSync(sslPaths.ca) : undefined,
- };
- }
- return clientOptions;
-};
+@injectable()
+export class RedisClientFactory {
+ public constructor(@inject(SERVICES.LOGGER) private readonly logger: Logger, @inject(SERVICES.CONFIG) private readonly config: IConfig) {}
-export const CONNECTION_TIMEOUT = 5000;
+ public createConnectionOptions(redisConfig: RedisConfig, isGeocodingRedis: boolean): Partial {
+ const { host, port, enableSslAuth, sslPaths, databases, ...clientOptions } = redisConfig;
+ clientOptions.socket = { host, port };
+ if (enableSslAuth) {
+ clientOptions.socket = {
+ ...clientOptions.socket,
+ tls: true,
+ key: sslPaths.key !== '' ? readFileSync(sslPaths.key) : undefined,
+ cert: sslPaths.cert !== '' ? readFileSync(sslPaths.cert) : undefined,
+ ca: sslPaths.ca !== '' ? readFileSync(sslPaths.ca) : undefined,
+ };
+ }
+ if (isGeocodingRedis) {
+ clientOptions.database = databases.geocodingIndex;
+ } else {
+ clientOptions.database = databases.ttlIndex;
+ }
+ return clientOptions;
+ }
-export type RedisClient = ReturnType;
+ public createRedisClient(redisIndex: symbol): RedisClient {
+ const dbConfig = this.config.get('redis');
-export const redisClientFactory: FactoryFunction = (container: DependencyContainer): RedisClient => {
- const logger = container.resolve(SERVICES.LOGGER);
- const config = container.resolve(SERVICES.CONFIG);
- const dbConfig = config.get('redis');
- const connectionOptions = createConnectionOptions(dbConfig);
+ const isGeocodingRedis: boolean = redisIndex === SERVICES.GEOCODING_REDIS;
+ const connectionOptions = this.createConnectionOptions(dbConfig, isGeocodingRedis);
- const redisClient = createClient(connectionOptions)
- .on('error', (error: Error) => logger.error({ msg: 'redis client errored', err: error }))
- .on('reconnecting', (...args) => logger.warn({ msg: 'redis client reconnecting', ...args }))
- .on('end', (...args) => logger.info({ msg: 'redis client end', ...args }))
- .on('connect', (...args) => logger.debug({ msg: 'redis client connected', ...args }))
- .on('ready', (...args) => logger.debug({ msg: 'redis client is ready', ...args }));
+ const redisClient = createClient(connectionOptions)
+ .on('error', (error: Error) => this.logger.error({ msg: 'redis client errored', err: error }))
+ .on('reconnecting', (...args) => this.logger.warn({ msg: 'redis client reconnecting', ...args }))
+ .on('end', (...args) => this.logger.info({ msg: 'redis client end', ...args }))
+ .on('connect', (...args) => this.logger.debug({ msg: 'redis client connected', ...args }))
+ .on('ready', (...args) => this.logger.debug({ msg: 'redis client is ready', ...args }));
- return redisClient;
-};
+ return redisClient;
+ }
+}
-export const healthCheckFunctionFactory = (redis: RedisClient): HealthCheck => {
- return async (): Promise => {
- const check = redis.ping().then(() => {
- return;
- });
- return promiseTimeout(CONNECTION_TIMEOUT, check);
- };
-};
+export type RedisClient = ReturnType;
diff --git a/src/redis/subscribe.ts b/src/redis/subscribe.ts
new file mode 100644
index 0000000..c5bd181
--- /dev/null
+++ b/src/redis/subscribe.ts
@@ -0,0 +1,86 @@
+import { Logger } from '@map-colonies/js-logger';
+import { DependencyContainer } from 'tsyringe';
+import { Producer } from 'kafkajs';
+import { REDIS_SUB, SERVICES } from '../common/constants';
+import { IConfig, FeedbackResponse, GeocodingResponse } from '../common/interfaces';
+import { NotFoundError } from '../common/errors';
+import { RedisClient } from '../redis/index';
+
+export const send = async (message: FeedbackResponse, logger: Logger, config: IConfig, kafkaProducer: Producer): Promise => {
+ const topic = config.get('outputTopic');
+ logger.info(`Kafka send message. Topic: ${topic}`);
+ try {
+ await kafkaProducer.send({
+ topic,
+ messages: [{ value: JSON.stringify(message) }],
+ });
+ logger.info(`Kafka message sent. Topic: ${topic}`);
+ } catch (error) {
+ logger.error({ msg: `Error uploading response to kafka`, message });
+ throw error;
+ }
+};
+
+export const redisSubscribe = async (deps: DependencyContainer): Promise => {
+ const geocodingRedis = deps.resolve(SERVICES.GEOCODING_REDIS);
+ const ttlRedis = deps.resolve(SERVICES.TTL_REDIS);
+ const config = deps.resolve(SERVICES.CONFIG);
+ const kafkaProducer = deps.resolve(SERVICES.KAFKA);
+ const logger = deps.resolve(SERVICES.LOGGER);
+ const subscriber = deps.resolve(REDIS_SUB);
+
+ const ttlDB = config.get('redis.databases.ttlIndex');
+ const geocodingDB = config.get('redis.databases.geocodingIndex');
+ const redisTTL = config.get('redis.ttl');
+
+ await subscriber.subscribe(`__keyevent@${geocodingDB}__:set`, async (message) => {
+ logger.info(`Redis: Got new request ${message}`);
+ // eslint-disable-next-line @typescript-eslint/naming-convention
+ await ttlRedis.set(message, '', { EX: redisTTL });
+ });
+
+ await subscriber.subscribe(`__keyevent@${ttlDB}__:expired`, async (message: string) => {
+ let wasUsed;
+ const redisResponse = (await geocodingRedis.get(message)) as string;
+ if (redisResponse) {
+ const geocodingResponse = JSON.parse(redisResponse) as GeocodingResponse;
+ wasUsed = geocodingResponse.wasUsed;
+ }
+ if (!(wasUsed ?? false)) {
+ await sendNoChosenResult(message, logger, config, kafkaProducer, geocodingRedis);
+ }
+ await geocodingRedis.del(message);
+ });
+ return subscriber;
+};
+
+export const sendNoChosenResult = async (
+ requestId: string,
+ logger: Logger,
+ config: IConfig,
+ kafkaProducer: Producer,
+ geocodingRedis: RedisClient
+): Promise => {
+ const feedbackResponse: FeedbackResponse = {
+ requestId,
+ chosenResultId: null,
+ userId: '',
+ responseTime: new Date(),
+ geocodingResponse: await getNoChosenGeocodingResponse(requestId, logger, geocodingRedis),
+ };
+ await send(feedbackResponse, logger, config, kafkaProducer);
+};
+
+export const getNoChosenGeocodingResponse = async (requestId: string, logger: Logger, geocodingRedis: RedisClient): Promise => {
+ try {
+ const redisResponse = await geocodingRedis.get(requestId);
+ if (redisResponse != null) {
+ const geocodingResponse = JSON.parse(redisResponse) as GeocodingResponse;
+ return geocodingResponse;
+ }
+ } catch (error) {
+ logger.error({ msg: `Redis Error: ${(error as Error).message}` });
+ throw error;
+ }
+ throw new NotFoundError(`The current request was not found ${requestId}`);
+};
diff --git a/tests/integration/docs/docs.spec.ts b/tests/integration/docs/docs.spec.ts
index 86d82a0..8239d5a 100644
--- a/tests/integration/docs/docs.spec.ts
+++ b/tests/integration/docs/docs.spec.ts
@@ -1,21 +1,36 @@
import jsLogger from '@map-colonies/js-logger';
import { trace } from '@opentelemetry/api';
import httpStatusCodes from 'http-status-codes';
+import { DependencyContainer } from 'tsyringe';
+import { CleanupRegistry } from '@map-colonies/cleanup-registry';
import { getApp } from '../../../src/app';
-import { SERVICES } from '../../../src/common/constants';
+import { CLEANUP_REGISTRY, REDIS_SUB, SERVICES } from '../../../src/common/constants';
import { DocsRequestSender } from './helpers/docsRequestSender';
describe('docs', function () {
let requestSender: DocsRequestSender;
+ let depContainer: DependencyContainer;
+
beforeAll(async function () {
- const app = await getApp({
+ const { app, container } = await getApp({
override: [
{ token: SERVICES.LOGGER, provider: { useValue: jsLogger({ enabled: false }) } },
{ token: SERVICES.TRACER, provider: { useValue: trace.getTracer('testTracer') } },
+ { token: SERVICES.GEOCODING_REDIS, provider: { useValue: {} } },
+ { token: SERVICES.TTL_REDIS, provider: { useValue: {} } },
+ { token: REDIS_SUB, provider: { useValue: {} } },
+ { token: SERVICES.KAFKA, provider: { useValue: {} } },
],
useChild: true,
});
- requestSender = new DocsRequestSender(app.app);
+ requestSender = new DocsRequestSender(app);
+ depContainer = container;
+ });
+
+ afterAll(async function () {
+ const cleanupRegistry = depContainer.resolve(CLEANUP_REGISTRY);
+ await cleanupRegistry.trigger();
+ depContainer.reset();
});
describe('Happy Path', function () {
diff --git a/tests/integration/feedback/feedback.spec.ts b/tests/integration/feedback/feedback.spec.ts
index d834cca..8b43b1e 100644
--- a/tests/integration/feedback/feedback.spec.ts
+++ b/tests/integration/feedback/feedback.spec.ts
@@ -1,19 +1,29 @@
/* eslint-disable @typescript-eslint/naming-convention */
-import jsLogger from '@map-colonies/js-logger';
-import Redis from 'ioredis';
+import * as crypto from 'node:crypto';
+import config from 'config';
+import jsLogger, { Logger } from '@map-colonies/js-logger';
import { DependencyContainer } from 'tsyringe';
import { Producer } from 'kafkajs';
import { trace } from '@opentelemetry/api';
import httpStatusCodes from 'http-status-codes';
+import { CleanupRegistry } from '@map-colonies/cleanup-registry';
import { getApp } from '../../../src/app';
-import { SERVICES } from '../../../src/common/constants';
+import { CLEANUP_REGISTRY, SERVICES } from '../../../src/common/constants';
import { IFeedbackModel } from '../../../src/feedback/models/feedback';
-import { GeocodingResponse } from '../../../src/common/interfaces';
+import { FeedbackResponse, GeocodingResponse } from '../../../src/common/interfaces';
+import { RedisClient } from '../../../src/redis';
+import { getNoChosenGeocodingResponse, send } from '../../../src/redis/subscribe';
+import { NotFoundError } from '../../../src/common/errors';
import { FeedbackRequestSender } from './helpers/requestSender';
+const mockKafkaProducer = {
+ connect: jest.fn(),
+ send: jest.fn(),
+} as unknown as jest.Mocked;
+
describe('feedback', function () {
let requestSender: FeedbackRequestSender;
- let redisConnection: Redis;
+ let geocodingRedis: RedisClient;
let depContainer: DependencyContainer;
beforeAll(async function () {
@@ -21,25 +31,23 @@ describe('feedback', function () {
override: [
{ token: SERVICES.LOGGER, provider: { useValue: jsLogger({ enabled: false }) } },
{ token: SERVICES.TRACER, provider: { useValue: trace.getTracer('testTracer') } },
+ { token: SERVICES.KAFKA, provider: { useValue: mockKafkaProducer } },
],
useChild: true,
});
requestSender = new FeedbackRequestSender(app);
- redisConnection = container.resolve(SERVICES.REDIS);
+ geocodingRedis = container.resolve(SERVICES.GEOCODING_REDIS);
depContainer = container;
});
afterAll(async function () {
- redisConnection = depContainer.resolve(SERVICES.REDIS);
- if (!['end'].includes(redisConnection.status)) {
- await redisConnection.quit();
- }
- const kafkaProducer = depContainer.resolve(SERVICES.KAFKA);
- await kafkaProducer.disconnect();
+ const cleanupRegistry = depContainer.resolve(CLEANUP_REGISTRY);
+ await cleanupRegistry.trigger();
+ depContainer.reset();
});
describe('Happy Path', function () {
- it('should return 200 status code and create the feedback', async function () {
+ it('Should return 204 status code and create the feedback', async function () {
const geocodingResponse: GeocodingResponse = {
userId: '1',
apiKey: '1',
@@ -47,8 +55,8 @@ describe('feedback', function () {
response: JSON.parse('["USA"]') as JSON,
respondedAt: new Date('2024-08-29T14:39:10.602Z'),
};
- const redisKey = '417a4635-0c59-4b5c-877c-45b4bbaaac7a';
- await redisConnection.set(redisKey, JSON.stringify(geocodingResponse));
+ const redisKey = crypto.randomUUID();
+ await geocodingRedis.set(redisKey, JSON.stringify(geocodingResponse));
const feedbackModel: IFeedbackModel = {
request_id: redisKey,
@@ -59,40 +67,134 @@ describe('feedback', function () {
expect(response.status).toBe(httpStatusCodes.NO_CONTENT);
});
+
+ it('Redis key should not exist in geocodingIndex after TTL has passed', async function () {
+ const geocodingResponse: GeocodingResponse = {
+ apiKey: '1',
+ site: 'test',
+ response: JSON.parse('["USA"]') as JSON,
+ respondedAt: new Date('2024-08-29T14:39:10.602Z'),
+ };
+ const redisKey = crypto.randomUUID();
+
+ await geocodingRedis.set(redisKey, JSON.stringify(geocodingResponse));
+ expect(await geocodingRedis.exists(redisKey)).toBe(1);
+
+ // eslint-disable-next-line @typescript-eslint/no-misused-promises
+ setTimeout(async () => {
+ expect(await geocodingRedis.exists(redisKey)).toBe(0);
+ }, 3000);
+ });
+
+ it('Should send feedback to kafka also when no response was chosen', async function () {
+ const topic = config.get('outputTopic');
+ const requestId = crypto.randomUUID();
+
+ const geocodingResponse: GeocodingResponse = {
+ apiKey: '1',
+ site: 'test',
+ response: JSON.parse('["USA"]') as JSON,
+ respondedAt: new Date(),
+ };
+ await geocodingRedis.set(requestId, JSON.stringify(geocodingResponse));
+
+ await new Promise((resolve) => setTimeout(resolve, 3000));
+
+ // eslint-disable-next-line @typescript-eslint/unbound-method
+ expect(mockKafkaProducer.send).toHaveBeenCalledWith({
+ topic,
+ messages: [
+ expect.objectContaining({
+ // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
+ value: expect.stringContaining(`"requestId":"${requestId}"`),
+ }),
+ ],
+ });
+ });
});
describe('Bad Path', function () {
- it('should return 400 status code since the chosen_result_id is a string', async function () {
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- const feedbackModel: any = {
- request_id: '4ca82def-e73f-4b57-989b-3e285034b971',
+ it('Should return 400 status code since the chosen_result_id is a string', async function () {
+ const feedbackModel: unknown = {
+ requestId: crypto.randomUUID(),
chosen_result_id: '1',
user_id: 'user1@mycompany.net',
};
- // eslint-disable-next-line @typescript-eslint/no-unsafe-argument
- const response = await requestSender.createFeedback(feedbackModel);
-
+ const response = await requestSender.createFeedback(feedbackModel as IFeedbackModel);
expect(response.status).toBe(httpStatusCodes.BAD_REQUEST);
});
- it('should return 400 status code because user_id is not valid', async function () {
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- const feedbackModel: any = {
- request_id: '4ca82def-e73f-4b57-989b-3e285034b971',
+ it('Should return 400 status code because user_id is not valid', async function () {
+ const feedbackModel: IFeedbackModel = {
+ request_id: crypto.randomUUID(),
chosen_result_id: 1,
user_id: 'user1',
};
- // eslint-disable-next-line @typescript-eslint/no-unsafe-argument
const response = await requestSender.createFeedback(feedbackModel);
-
expect(response.status).toBe(httpStatusCodes.BAD_REQUEST);
});
+
+ it('Should return 400 status code when redis is unavailable', async function () {
+ const mockRedis = {
+ get: jest.fn(),
+ } as unknown as jest.Mocked;
+
+ const mockLogger = {
+ error: jest.fn(),
+ info: jest.fn(),
+ } as unknown as jest.Mocked;
+
+ const requestId = crypto.randomUUID();
+
+ depContainer.register(SERVICES.GEOCODING_REDIS, { useValue: mockRedis });
+ depContainer.register(SERVICES.LOGGER, { useValue: mockLogger });
+
+ (mockRedis.get as jest.Mock).mockRejectedValue(new Error('Redis get failed'));
+
+ try {
+ await getNoChosenGeocodingResponse(requestId, mockLogger, mockRedis);
+ } catch (error) {
+ // eslint-disable-next-line jest/no-conditional-expect
+ expect(mockLogger.error).toHaveBeenCalledWith({ msg: 'Redis Error: Redis get failed' });
+ }
+ });
+
+ it('Should throw an error when uploading to Kafka fails', async function () {
+ const mockLogger = {
+ error: jest.fn(),
+ info: jest.fn(),
+ } as unknown as jest.Mocked;
+
+ const feedbackResponse: FeedbackResponse = {
+ requestId: crypto.randomUUID(),
+ chosenResultId: null,
+ userId: '',
+ responseTime: new Date(),
+ geocodingResponse: {
+ apiKey: '1',
+ site: 'test',
+ response: JSON.parse('["USA"]') as JSON,
+ respondedAt: new Date(),
+ },
+ };
+ mockKafkaProducer.send.mockRejectedValue(new Error('Error uploading to Kafka'));
+
+ try {
+ await send(feedbackResponse, mockLogger, config, mockKafkaProducer);
+ } catch (error) {
+ // eslint-disable-next-line jest/no-conditional-expect
+ expect(mockLogger.error).toHaveBeenCalledWith({
+ msg: 'Error uploading response to kafka',
+ message: feedbackResponse,
+ });
+ }
+ });
});
describe('Sad Path', function () {
- it('should return 404 status code since the feedback does not exist', async function () {
+ it('Should return 404 status code since the feedback does not exist', async function () {
const feedbackModel: IFeedbackModel = {
- request_id: '4ca82def-e73f-4b57-989b-3e285034b971',
+ request_id: crypto.randomUUID(),
chosen_result_id: 1,
user_id: 'user1@mycompany.net',
};
@@ -100,5 +202,27 @@ describe('feedback', function () {
expect(response.status).toBe(httpStatusCodes.NOT_FOUND);
});
+
+ it('Should return 404 status code when request is not found in redis', async function () {
+ const mockRedis = {
+ get: jest.fn(),
+ } as unknown as jest.Mocked;
+
+ const mockLogger = {
+ error: jest.fn(),
+ info: jest.fn(),
+ } as unknown as jest.Mocked;
+
+ depContainer.register(SERVICES.GEOCODING_REDIS, { useValue: mockRedis });
+ const requestId = crypto.randomUUID();
+
+ (mockRedis.get as jest.Mock).mockResolvedValue(null);
+
+ await expect(getNoChosenGeocodingResponse(requestId, mockLogger, mockRedis)).rejects.toThrow(
+ new NotFoundError(`The current request was not found ${requestId}`)
+ );
+
+ expect(mockLogger.error).not.toHaveBeenCalled();
+ });
});
});