Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ Geocoding's feedback API collects usage data from the [Geocoding API](https://gi
Checkout the OpenAPI spec [here](/openapi3.yaml)

## Workflow
![workflow img](https://github.com/user-attachments/assets/7ed767ad-02dd-46f3-9de7-d2be72205e2c)
![workflow img](https://github.com/user-attachments/assets/63b8c7ed-4509-4dea-87ae-dc0cfc625ff9)

## How does it work?
Once a Geocoding user searches for something using Geocoding's api, it enters Redis (in the geocoding DB index), and an event is triggered. This event adds the `requestId` (from geocoding) to a different Redis DB with a ttl of 5 minutes (TTL DB index).<br/><br/>
Once a Geocoding user searches for something using Geocoding's api, it enters Redis, and an event is triggered. This event adds to the `requestId` (from geocoding) a prefix, and adds it also to Redis with a ttl.<br/><br/>
When the Geocoding user chooses a result, the requesting system will then send the `request_id`, the `chosen_response_id`, and the `user_id` back to us using the [Feedback api](/openapi3.yaml).<br/>
Once we get the chosen response from the api, we validate it and make sure it exists in the Redis geocoding DB, and once it is validated we add a `wasUsed = true` parameter to the geocoding response (in Redis) for later use. We then send the response to Kafka (where it will later be enriched and added to elastic -> see [Geocoding Enrichment](https://github.com/MapColonies/geocoding-enrichment) for more details).<br/>
Once we get the chosen response from the api, we validate it and make sure it exists in the Redis, and once it is validated we add a `wasUsed = true` parameter to the geocoding response (in Redis) for later use. We then send the response to Kafka (where it will later be enriched and added to elastic -> see [Geocoding Enrichment](https://github.com/MapColonies/geocoding-enrichment) for more details).<br/>
Once the TTL of the `requestId` expires an even is triggered, and there are two options:<br/>
1. One or more responses were chosen.
2. No response was chosen (only searched for).
Expand Down
4 changes: 0 additions & 4 deletions config/custom-environment-variables.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@
"key": "REDIS_KEY_PATH",
"cert": "REDIS_CERT_PATH"
},
"databases": {
"geocodingIndex": "REDIS_GEOCODING_INDEX",
"ttlIndex": "REDIS_TTL_INDEX"
},
"ttl": {
"__name": "REDIS_TTL",
"__format": "number"
Expand Down
4 changes: 0 additions & 4 deletions config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@
"key": "",
"cert": ""
},
"databases": {
"geocodingIndex": 0,
"ttlIndex": 1
},
"ttl": 300
},
"kafka": {
Expand Down
6 changes: 1 addition & 5 deletions config/test.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,10 @@
"key": "",
"cert": ""
},
"databases": {
"geocodingIndex": 2,
"ttlIndex": 3
},
"ttl": 2
},
"kafka": {
"brokers": "kafka:9092"
"brokers": "localhost:9092"
},
"kafkaConsumer": {},
"kafkaProducer": {},
Expand Down
1 change: 0 additions & 1 deletion helm/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ data:

{{- with .Values.redisConfig }}
REDIS_HOST: {{ .host }}
REDIS_DATABASE: {{ .database | quote}}
REDIS_PORT: {{ .port | quote }}
{{- if .sslAuth.enabled }}
REDIS_ENABLE_SSL_AUTH: "true"
Expand Down
1 change: 0 additions & 1 deletion helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ redisConfig:
host: localhost
username: ""
password: ""
database: 0
port: 6379
sslAuth:
enabled: false
Expand Down
3 changes: 1 addition & 2 deletions src/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ export const SERVICES = {
CONFIG: Symbol('Config'),
TRACER: Symbol('Tracer'),
METER: Symbol('Meter'),
GEOCODING_REDIS: Symbol('GeocodingRedis'),
TTL_REDIS: Symbol('TTLRedis'),
REDIS: Symbol('Redis'),
KAFKA: Symbol('Kafka'),
} satisfies Record<string, symbol>;
/* eslint-enable @typescript-eslint/naming-convention */
Expand Down
1 change: 0 additions & 1 deletion src/common/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ export type RedisConfig = {
port: number;
enableSslAuth: boolean;
sslPaths: { ca: string; cert: string; key: string };
databases: { geocodingIndex: number; ttlIndex: number };
} & RedisClientOptions;

export type KafkaOptions = {
Expand Down
64 changes: 13 additions & 51 deletions src/common/utils.ts
Original file line number Diff line number Diff line change
@@ -1,52 +1,14 @@
import { DependencyContainer } from 'tsyringe';
import { Logger } from '@map-colonies/js-logger';
import { HealthCheck } from '@godaddy/terminus';
import { RedisClient } from '../redis/index';
import { SERVICES } from './constants';

export const healthCheckFactory = (container: DependencyContainer): HealthCheck => {
return async (): Promise<void> => {
const logger = container.resolve<Logger>(SERVICES.LOGGER);
const geocodingRedis = container.resolve<RedisClient>(SERVICES.GEOCODING_REDIS);
const ttlRedis = container.resolve<RedisClient>(SERVICES.TTL_REDIS);

const promises: Promise<unknown>[] = [];

promises.push(
new Promise((resolve, reject) => {
geocodingRedis
.ping()
.then(() => {
resolve('Healthcheck passed for GeocodingRedis connection');
})
.catch((error: Error) => {
reject({
msg: `Healthcheck failed for GeocodingRedis.`,
error,
});
});
})
);
promises.push(
new Promise((resolve, reject) => {
ttlRedis
.ping()
.then(() => {
resolve('Healthcheck passed for TTLRedis connection');
})
.catch((error: Error) => {
reject({
msg: `Healthcheck failed for TTLRedis.`,
error,
});
});
})
);

await Promise.allSettled(promises).then((results) => {
results.forEach((msg) => logger.debug({ msg }));
});

return Promise.resolve();
};
import { TimeoutError } from './errors';

export const promiseTimeout = async <T>(ms: number, promise: Promise<T>): Promise<T> => {
// create a promise that rejects in <ms> milliseconds
const timeout = new Promise<T>((_, reject) => {
const id = setTimeout(() => {
clearTimeout(id);
reject(new TimeoutError(`Timed out in + ${ms} + ms.`));
}, ms);
});

// returns a race between our timeout and the passed in promise
return Promise.race([promise, timeout]);
};
12 changes: 6 additions & 6 deletions src/containerConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@ import { CLEANUP_REGISTRY, HEALTHCHECK, ON_SIGNAL, REDIS_CLIENT_FACTORY, REDIS_S
import { tracing } from './common/tracing';
import { feedbackRouterFactory, FEEDBACK_ROUTER_SYMBOL } from './feedback/routes/feedbackRouter';
import { InjectionObject, registerDependencies } from './common/dependencyRegistration';
import { RedisClientFactory } from './redis';
import { healthCheckFunctionFactory, RedisClient, RedisClientFactory } from './redis';
import { kafkaClientFactory } from './kafka';
import { redisSubscribe } from './redis/subscribe';
import { healthCheckFactory } from './common/utils';

export interface RegisterOptions {
override?: InjectionObject<unknown>[];
Expand Down Expand Up @@ -80,8 +79,8 @@ export const registerExternalValues = async (options?: RegisterOptions): Promise
postInjectionHook: async (deps: DependencyContainer): Promise<void> => {
const redisFactory = deps.resolve<RedisClientFactory>(REDIS_CLIENT_FACTORY);

for (const redisIndex of [SERVICES.GEOCODING_REDIS, SERVICES.TTL_REDIS, REDIS_SUB]) {
const redis = redisFactory.createRedisClient(redisIndex);
for (const redisIndex of [SERVICES.REDIS, REDIS_SUB]) {
const redis = redisFactory.createRedisClient();
deps.register(redisIndex, { useValue: redis });
cleanupRegistry.register({
func: async (): Promise<void> => {
Expand All @@ -105,8 +104,9 @@ export const registerExternalValues = async (options?: RegisterOptions): Promise
{
token: HEALTHCHECK,
provider: {
useFactory: (depContainer): HealthCheck => {
return healthCheckFactory(depContainer);
useFactory: (container): HealthCheck => {
const redis = container.resolve<RedisClient>(SERVICES.REDIS);
return healthCheckFunctionFactory(redis);
},
},
},
Expand Down
6 changes: 3 additions & 3 deletions src/feedback/models/feedbackManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { IFeedbackModel } from './feedback';
export class FeedbackManager {
public constructor(
@inject(SERVICES.LOGGER) private readonly logger: Logger,
@inject(SERVICES.GEOCODING_REDIS) private readonly geocodingRedis: RedisClient,
@inject(SERVICES.REDIS) private readonly redisClient: RedisClient,
@inject(SERVICES.KAFKA) private readonly kafkaProducer: Producer,
@inject(SERVICES.CONFIG) private readonly config: IConfig
) {}
Expand All @@ -32,7 +32,7 @@ export class FeedbackManager {
responseTime: new Date(),
geocodingResponse: await this.getGeocodingResponse(requestId, userId, apiKey),
};
await this.geocodingRedis.set(requestId, JSON.stringify(feedbackResponse.geocodingResponse));
await this.redisClient.set(requestId, JSON.stringify(feedbackResponse.geocodingResponse));

this.logger.info({ msg: 'creating feedback', requestId });
await this.send(feedbackResponse);
Expand All @@ -41,7 +41,7 @@ export class FeedbackManager {

public async getGeocodingResponse(requestId: string, userId: string, apiKey: string): Promise<GeocodingResponse> {
try {
const redisResponse = (await this.geocodingRedis.get(requestId)) as string;
const redisResponse = (await this.redisClient.get(requestId)) as string;
if (redisResponse) {
const geocodingResponse = JSON.parse(redisResponse) as GeocodingResponse;
geocodingResponse.userId = userId;
Expand Down
28 changes: 17 additions & 11 deletions src/redis/index.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import { readFileSync } from 'fs';
import { inject, injectable } from 'tsyringe';
import { Logger } from '@map-colonies/js-logger';
import { HealthCheck } from '@godaddy/terminus';
import { createClient, RedisClientOptions } from 'redis';
import { SERVICES } from '../common/constants';
import { RedisConfig, IConfig } from '../common/interfaces';
import { promiseTimeout } from '../common/utils';

@injectable()
export class RedisClientFactory {
public constructor(@inject(SERVICES.LOGGER) private readonly logger: Logger, @inject(SERVICES.CONFIG) private readonly config: IConfig) {}

public createConnectionOptions(redisConfig: RedisConfig, isGeocodingRedis: boolean): Partial<RedisClientOptions> {
const { host, port, enableSslAuth, sslPaths, databases, ...clientOptions } = redisConfig;
public createConnectionOptions(redisConfig: RedisConfig): Partial<RedisClientOptions> {
const { host, port, enableSslAuth, sslPaths, ...clientOptions } = redisConfig;
clientOptions.socket = { host, port };
if (enableSslAuth) {
clientOptions.socket = {
Expand All @@ -21,19 +23,12 @@ export class RedisClientFactory {
ca: sslPaths.ca !== '' ? readFileSync(sslPaths.ca) : undefined,
};
}
if (isGeocodingRedis) {
clientOptions.database = databases.geocodingIndex;
} else {
clientOptions.database = databases.ttlIndex;
}
return clientOptions;
}

public createRedisClient(redisIndex: symbol): RedisClient {
public createRedisClient(): RedisClient {
const dbConfig = this.config.get<RedisConfig>('redis');

const isGeocodingRedis: boolean = redisIndex === SERVICES.GEOCODING_REDIS;
const connectionOptions = this.createConnectionOptions(dbConfig, isGeocodingRedis);
const connectionOptions = this.createConnectionOptions(dbConfig);

const redisClient = createClient(connectionOptions)
.on('error', (error: Error) => this.logger.error({ msg: 'redis client errored', err: error }))
Expand All @@ -47,3 +42,14 @@ export class RedisClientFactory {
}

export type RedisClient = ReturnType<typeof createClient>;

export const CONNECTION_TIMEOUT = 5000;

export const healthCheckFunctionFactory = (redis: RedisClient): HealthCheck => {
return async (): Promise<void> => {
const check = redis.ping().then(() => {
return;
});
return promiseTimeout<void>(CONNECTION_TIMEOUT, check);
};
};
50 changes: 28 additions & 22 deletions src/redis/subscribe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import { IConfig, FeedbackResponse, GeocodingResponse } from '../common/interfac
import { NotFoundError } from '../common/errors';
import { RedisClient } from '../redis/index';

const TTL_PREFIX = 'ttl_';

export const send = async (message: FeedbackResponse, logger: Logger, config: IConfig, kafkaProducer: Producer): Promise<void> => {
const topic = config.get<string>('outputTopic');
logger.info(`Kafka send message. Topic: ${topic}`);
Expand All @@ -22,35 +24,39 @@ export const send = async (message: FeedbackResponse, logger: Logger, config: IC
};

export const redisSubscribe = async (deps: DependencyContainer): Promise<RedisClient> => {
const geocodingRedis = deps.resolve<RedisClient>(SERVICES.GEOCODING_REDIS);
const ttlRedis = deps.resolve<RedisClient>(SERVICES.TTL_REDIS);
const redisClient = deps.resolve<RedisClient>(SERVICES.REDIS);
const config = deps.resolve<IConfig>(SERVICES.CONFIG);
const kafkaProducer = deps.resolve<Producer>(SERVICES.KAFKA);
const logger = deps.resolve<Logger>(SERVICES.LOGGER);
const subscriber = deps.resolve<RedisClient>(REDIS_SUB);

logger.debug('Redis subscriber init');
const ttlDB = config.get<number>('redis.databases.ttlIndex');
const geocodingDB = config.get<number>('redis.databases.geocodingIndex');
const redisTTL = config.get<number>('redis.ttl');

await subscriber.subscribe(`__keyevent@${geocodingDB}__:set`, async (message) => {
logger.info(`Redis: Got new request ${message}`);
// eslint-disable-next-line @typescript-eslint/naming-convention
await ttlRedis.set(message, '', { EX: redisTTL });
await subscriber.subscribe(`__keyevent@0__:set`, async (message) => {
if (!message.startsWith(TTL_PREFIX)) {
logger.info(`Redis: Got new request ${message}`);
const ttlMessage = TTL_PREFIX + message;
// eslint-disable-next-line @typescript-eslint/naming-convention
await redisClient.set(ttlMessage, '', { EX: redisTTL });
}
});

await subscriber.subscribe(`__keyevent@${ttlDB}__:expired`, async (message: string) => {
let wasUsed;
const redisResponse = (await geocodingRedis.get(message)) as string;
if (redisResponse) {
const geocodingResponse = JSON.parse(redisResponse) as GeocodingResponse;
wasUsed = geocodingResponse.wasUsed;
}
if (!(wasUsed ?? false)) {
await sendNoChosenResult(message, logger, config, kafkaProducer, geocodingRedis);
await subscriber.subscribe(`__keyevent@0__:expired`, async (message: string) => {
if (message.startsWith(TTL_PREFIX)) {
const geocodingMessage = message.substring(TTL_PREFIX.length);

let wasUsed;
const redisResponse = (await redisClient.get(geocodingMessage)) as string;
if (redisResponse) {
const geocodingResponse = JSON.parse(redisResponse) as GeocodingResponse;
wasUsed = geocodingResponse.wasUsed;
}
if (!(wasUsed ?? false)) {
await sendNoChosenResult(geocodingMessage, logger, config, kafkaProducer, redisClient);
}
await redisClient.del(geocodingMessage);
}
await geocodingRedis.del(message);
});
return subscriber;
};
Expand All @@ -60,21 +66,21 @@ export const sendNoChosenResult = async (
logger: Logger,
config: IConfig,
kafkaProducer: Producer,
geocodingRedis: RedisClient
redisClient: RedisClient
): Promise<void> => {
const feedbackResponse: FeedbackResponse = {
requestId,
chosenResultId: null,
userId: '',
responseTime: new Date(),
geocodingResponse: await getNoChosenGeocodingResponse(requestId, logger, geocodingRedis),
geocodingResponse: await getNoChosenGeocodingResponse(requestId, logger, redisClient),
};
await send(feedbackResponse, logger, config, kafkaProducer);
};

export const getNoChosenGeocodingResponse = async (requestId: string, logger: Logger, geocodingRedis: RedisClient): Promise<GeocodingResponse> => {
export const getNoChosenGeocodingResponse = async (requestId: string, logger: Logger, redisClient: RedisClient): Promise<GeocodingResponse> => {
try {
const redisResponse = await geocodingRedis.get(requestId);
const redisResponse = await redisClient.get(requestId);
if (redisResponse != null) {
const geocodingResponse = JSON.parse(redisResponse) as GeocodingResponse;
return geocodingResponse;
Expand Down
3 changes: 1 addition & 2 deletions tests/integration/docs/docs.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ describe('docs', function () {
override: [
{ token: SERVICES.LOGGER, provider: { useValue: jsLogger({ enabled: false }) } },
{ token: SERVICES.TRACER, provider: { useValue: trace.getTracer('testTracer') } },
{ token: SERVICES.GEOCODING_REDIS, provider: { useValue: {} } },
{ token: SERVICES.TTL_REDIS, provider: { useValue: {} } },
{ token: SERVICES.REDIS, provider: { useValue: {} } },
{ token: REDIS_SUB, provider: { useValue: {} } },
{ token: SERVICES.KAFKA, provider: { useValue: {} } },
],
Expand Down
Loading