Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ npm install
npx husky install
```

#### Make sure you have keyspace notifications in Redis
In `redis-cli`
```bash
CONFIG SET notify-keyspace-events KEA
```

## Run Locally

Clone the project
Expand Down
10 changes: 8 additions & 2 deletions config/custom-environment-variables.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@
"ttl": {
"__name": "REDIS_TTL",
"__format": "number"
}
},
"expiredResponseTtl": {
"__name": "REDIS_EXPIRED_RESPONSE_TTL",
"__format": "number"
},
"prefix": "REDIS_KEY_PREFIX"
},
"kafka": {
"brokers": "KAFKA_BROKERS",
Expand All @@ -77,6 +82,7 @@
"kafkaProducer": {},
"outputTopic": "KAFKA_OUTPUT_TOPIC",
"application": {
"userValidation": "USER_ID_DOMAIN"
"userValidation": "USER_ID_DOMAIN",
"__format": "json"
}
}
5 changes: 3 additions & 2 deletions config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
"key": "",
"cert": ""
},
"ttl": 300
"ttl": 600,
"expiredResponseTtl": 300
},
"kafka": {
"brokers": "KAFKA_URL:9092",
Expand All @@ -51,6 +52,6 @@
"kafkaProducer": {},
"outputTopic": "topic",
"application": {
"userValidation": "@mycompany.net"
"userValidation": ["@mycompany.net"]
}
}
5 changes: 3 additions & 2 deletions config/test.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
"key": "",
"cert": ""
},
"ttl": 2
"ttl": 600,
"expiredResponseTtl": 2
},
"kafka": {
"brokers": "localhost:9092"
Expand All @@ -19,6 +20,6 @@
"kafkaProducer": {},
"outputTopic": "testTopic",
"application": {
"userValidation": "@mycompany.net"
"userValidation": ["@mycompany.net"]
}
}
16 changes: 11 additions & 5 deletions src/feedback/models/feedbackManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,26 @@ export class FeedbackManager {
public async createFeedback(feedback: IFeedbackModel, apiKey: string): Promise<FeedbackResponse> {
const requestId = feedback.request_id;
const userId = feedback.user_id;
const userValidation = this.config.get<string>('application.userValidation');
const userValidation = this.config.get<string[]>('application.userValidation');
const ttl = this.config.get<number>('redis.ttl');
const prefix = this.config.has('redis.prefix') ? this.config.get<string>('redis.prefix') : undefined;

if (!userId.endsWith(userValidation)) {
throw new BadRequestError(`user_id not valid. valid user_id ends with "${userValidation}"`);
const validateUser = !userValidation.some((validEnding) => validEnding && userId.endsWith(validEnding));
if (validateUser) {
throw new BadRequestError(`user_id not valid. valid user_id ends with "${JSON.stringify(userValidation)}"`);
}

const fullRequestId = prefix !== undefined ? `${prefix}:${requestId}` : requestId;

const feedbackResponse: FeedbackResponse = {
requestId: requestId,
chosenResultId: feedback.chosen_result_id,
userId: userId,
responseTime: new Date(),
geocodingResponse: await this.getGeocodingResponse(requestId, userId, apiKey),
geocodingResponse: await this.getGeocodingResponse(fullRequestId, userId, apiKey),
};
await this.redisClient.set(requestId, JSON.stringify(feedbackResponse.geocodingResponse));

await this.redisClient.setEx(fullRequestId, ttl, JSON.stringify(feedbackResponse.geocodingResponse));

this.logger.info({ msg: 'creating feedback', requestId });
await this.send(feedbackResponse);
Expand Down
16 changes: 10 additions & 6 deletions src/redis/subscribe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { IConfig, FeedbackResponse, GeocodingResponse } from '../common/interfac
import { NotFoundError } from '../common/errors';
import { RedisClient } from '../redis/index';

const TTL_PREFIX = 'ttl_';
const TTL_PREFIX = 'ttl:';

export const send = async (message: FeedbackResponse, logger: Logger, config: IConfig, kafkaProducer: Producer): Promise<void> => {
const topic = config.get<string>('outputTopic');
Expand All @@ -31,20 +31,24 @@ export const redisSubscribe = async (deps: DependencyContainer): Promise<RedisCl
const subscriber = deps.resolve<RedisClient>(REDIS_SUB);

logger.debug('Redis subscriber init');
const redisTTL = config.get<number>('redis.ttl');
const redisTTL = config.get<number>('redis.expiredResponseTtl');
const redisPrefix = config.has('redis.prefix') ? config.get<string>('redis.prefix') : undefined;

const prefixWithTtl = redisPrefix !== undefined ? `${redisPrefix}:${TTL_PREFIX}` : TTL_PREFIX;
await subscriber.subscribe(`__keyevent@0__:set`, async (message) => {
if (!message.startsWith(TTL_PREFIX)) {
if (!message.startsWith(prefixWithTtl)) {
logger.info(`Redis: Got new request ${message}`);
const ttlMessage = TTL_PREFIX + message;

const noPrefixMessage = redisPrefix !== undefined ? message.split(':')[1] : message;
const ttlMessage = prefixWithTtl + noPrefixMessage;
// eslint-disable-next-line @typescript-eslint/naming-convention
await redisClient.set(ttlMessage, '', { EX: redisTTL });
}
});

await subscriber.subscribe(`__keyevent@0__:expired`, async (message: string) => {
if (message.startsWith(TTL_PREFIX)) {
const geocodingMessage = message.substring(TTL_PREFIX.length);
if (message.startsWith(prefixWithTtl)) {
const geocodingMessage = message.substring(prefixWithTtl.length);

let wasUsed;
const redisResponse = (await redisClient.get(geocodingMessage)) as string;
Expand Down
86 changes: 79 additions & 7 deletions tests/integration/feedback/feedback.spec.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/* eslint-disable @typescript-eslint/naming-convention */
import * as crypto from 'node:crypto';
import config from 'config';
import config, { IConfig } from 'config';
import jsLogger, { Logger } from '@map-colonies/js-logger';
import { DependencyContainer } from 'tsyringe';
import { Producer } from 'kafkajs';
import { trace } from '@opentelemetry/api';
import httpStatusCodes from 'http-status-codes';
import { CleanupRegistry } from '@map-colonies/cleanup-registry';
import { getApp } from '../../../src/app';
import { CLEANUP_REGISTRY, SERVICES } from '../../../src/common/constants';
import { CLEANUP_REGISTRY, REDIS_SUB, SERVICES } from '../../../src/common/constants';
import { IFeedbackModel } from '../../../src/feedback/models/feedback';
import { FeedbackResponse, GeocodingResponse } from '../../../src/common/interfaces';
import { RedisClient } from '../../../src/redis';
Expand Down Expand Up @@ -69,6 +69,7 @@ describe('feedback', function () {
});

it('Redis key should not exist in geocodingIndex after TTL has passed', async function () {
const redisTtl = config.get<number>('redis.expiredResponseTtl');
const geocodingResponse: GeocodingResponse = {
apiKey: '1',
site: 'test',
Expand All @@ -80,15 +81,14 @@ describe('feedback', function () {
await redisClient.set(redisKey, JSON.stringify(geocodingResponse));
expect(await redisClient.exists(redisKey)).toBe(1);

// eslint-disable-next-line @typescript-eslint/no-misused-promises
setTimeout(async () => {
expect(await redisClient.exists(redisKey)).toBe(0);
}, 3000);
await new Promise((resolve) => setTimeout(resolve, (redisTtl + 1) * 1000));
expect(await redisClient.exists(redisKey)).toBe(0);
});

it('Should send feedback to kafka also when no response was chosen', async function () {
const topic = config.get<string>('outputTopic');
const requestId = crypto.randomUUID();
const redisTtl = config.get<number>('redis.expiredResponseTtl');

const geocodingResponse: GeocodingResponse = {
apiKey: '1',
Expand All @@ -98,7 +98,7 @@ describe('feedback', function () {
};
await redisClient.set(requestId, JSON.stringify(geocodingResponse));

await new Promise((resolve) => setTimeout(resolve, 3000));
await new Promise((resolve) => setTimeout(resolve, (redisTtl + 1) * 1000));

// eslint-disable-next-line @typescript-eslint/unbound-method
expect(mockKafkaProducer.send).toHaveBeenCalledWith({
Expand All @@ -111,6 +111,78 @@ describe('feedback', function () {
],
});
});

describe('Redis uses prefix key', () => {
it('should return 200 status code and add key to Redis with prefix', async function () {
const realConfig = depContainer.resolve<IConfig>(SERVICES.CONFIG);
const prefix = 'test-prefix-item';

const configWithPrefix: IConfig = {
...realConfig,
get<T>(key: string): T {
if (key === 'redis.prefix') {
return prefix as unknown as T;
}
if (key === 'redis') {
const realRedisConfig = realConfig.get<RedisClient>('redis');
return { ...realRedisConfig, prefix } as T;
}
return realConfig.get<T>(key);
},
has(key: string): boolean {
if (key === 'redis.prefix') {
return true;
}
return realConfig.has(key);
},
};

const mockRegisterOptions = {
override: [
{ token: SERVICES.LOGGER, provider: { useValue: jsLogger({ enabled: false }) } },
{ token: SERVICES.TRACER, provider: { useValue: trace.getTracer('testTracer') } },
{ token: SERVICES.KAFKA, provider: { useValue: mockKafkaProducer } },
{ token: SERVICES.CONFIG, provider: { useValue: configWithPrefix } },
],
useChild: true,
};
const { app: mockApp, container: localContainer } = await getApp(mockRegisterOptions);
const localRequestSender = new FeedbackRequestSender(mockApp);

const redisConnection = localContainer.resolve<RedisClient>(SERVICES.REDIS);

const geocodingResponse: GeocodingResponse = {
userId: '1',
apiKey: '1',
site: 'test',
response: JSON.parse('["USA"]') as JSON,
respondedAt: new Date('2024-08-29T14:39:10.602Z'),
};
const redisKey = crypto.randomUUID();
await redisConnection.set(`${prefix}:${redisKey}`, JSON.stringify(geocodingResponse));

const feedbackModel: IFeedbackModel = {
request_id: redisKey,
chosen_result_id: 3,
user_id: 'user1@mycompany.net',
};
const response = await localRequestSender.createFeedback(feedbackModel);

const keys = await redisConnection.keys(prefix + '*');
expect(keys.length).toBeGreaterThanOrEqual(1);
expect(response.status).toBe(httpStatusCodes.NO_CONTENT);

await redisConnection.del(`${prefix}:${redisKey}`);
const subscriber = localContainer.resolve<RedisClient>(REDIS_SUB);

await subscriber.unsubscribe('__keyevent@0__:set');
await subscriber.unsubscribe('__keyevent@0__:expired');

const localCleanup = localContainer.resolve<CleanupRegistry>(CLEANUP_REGISTRY);
await localCleanup.trigger();
localContainer.reset();
});
});
});

describe('Bad Path', function () {
Expand Down
Loading