diff --git a/packages/core/lib/explorer.ts b/packages/core/lib/explorer.ts index 416a219..f4c8546 100644 --- a/packages/core/lib/explorer.ts +++ b/packages/core/lib/explorer.ts @@ -7,6 +7,9 @@ import { GenericFunction } from './interfaces'; import { JOB_NAME, JOB_OPTIONS } from './queue/constants'; import { QueueMetadata } from './queue/metadata'; import { Injectable } from './foundation'; +import { REFILL_INTERVAL, TOKEN_COUNT } from './limiter/constants'; +import { ulid } from 'ulid'; +import { Limiter } from './limiter'; @Injectable() export class IntentExplorer { @@ -98,4 +101,23 @@ export class IntentExplorer { CommandMeta.setCommand(command, options, methodRef.bind(instance)); } + + lookupLimittedMethods( + instance: Record, + key: string, + ) { + let methodRef = instance[key]; + const hasCommandMeta = Reflect.hasMetadata(TOKEN_COUNT, instance, key); + + if (!hasCommandMeta) return; + + const tokensCount = Reflect.getMetadata(TOKEN_COUNT, instance, key); + const frequency = Reflect.getMetadata(REFILL_INTERVAL, instance, key); + const funcKey = ulid(); + Limiter.initializeToken(key + funcKey, tokensCount, frequency); + instance[key] = function (...args) { + Limiter.useToken(key + funcKey); + return methodRef.apply(instance, args); + }; + } } diff --git a/packages/core/lib/limiter/constants.ts b/packages/core/lib/limiter/constants.ts new file mode 100644 index 0000000..b2ed399 --- /dev/null +++ b/packages/core/lib/limiter/constants.ts @@ -0,0 +1,2 @@ +export const TOKEN_COUNT = '__TOKEN_COUNT__'; +export const REFILL_INTERVAL = '__REFILL_INTERVAL__'; diff --git a/packages/core/lib/limiter/decorator.ts b/packages/core/lib/limiter/decorator.ts new file mode 100644 index 0000000..abd3a87 --- /dev/null +++ b/packages/core/lib/limiter/decorator.ts @@ -0,0 +1,15 @@ +import { REFILL_INTERVAL, TOKEN_COUNT } from './constants'; + +export const Limit = (tokens: number, seconds: number) => { + console.log('first(): factory evaluated'); + return function ( + target: any, + propertyKey: string, + descriptor: PropertyDescriptor, + ) { + Reflect.defineMetadata(TOKEN_COUNT, tokens, target, propertyKey); + Reflect.defineMetadata(REFILL_INTERVAL, seconds, target, propertyKey); + + return descriptor; + }; +}; diff --git a/packages/core/lib/limiter/drivers/index.ts b/packages/core/lib/limiter/drivers/index.ts new file mode 100644 index 0000000..a187a39 --- /dev/null +++ b/packages/core/lib/limiter/drivers/index.ts @@ -0,0 +1,2 @@ +export * from './memory'; +export * from './redis'; diff --git a/packages/core/lib/limiter/drivers/memory.ts b/packages/core/lib/limiter/drivers/memory.ts new file mode 100644 index 0000000..92e6934 --- /dev/null +++ b/packages/core/lib/limiter/drivers/memory.ts @@ -0,0 +1,75 @@ +import { LimiterDriver } from '../interfaces/limiterDriver'; + +export class MemoryDriver implements LimiterDriver { + private static keyCounts = {}; + private static keyScores = {}; + private options = { prefix: 'in-memory-keys' }; + + async setCounter( + key: string, + value: number, + ttlInSec?: number, + ): Promise { + await this.set(key, value, ttlInSec); + } + + async incrementCounter(key: string): Promise { + MemoryDriver.keyCounts[this.storeKey(key)] += 1; + } + + async decrementCounter(key: string): Promise { + if ([undefined, 0].includes(await this.getCount(key))) { + return false; + } + MemoryDriver.keyCounts[this.storeKey(key)] -= 1; + return true; + } + + async delScoresLessThan(key: string, val: number): Promise { + let ele = MemoryDriver.keyScores[this.storeKey(key)]?.[0]; + if (!ele) return; + while (ele && ele < val) { + delete MemoryDriver.keyScores[this.storeKey(key)]?.[0]; + ele = MemoryDriver.keyScores[this.storeKey(key)]?.[0]; + } + } + + async addNewScore(key: string, val: number) { + if (!MemoryDriver.keyScores[this.storeKey(key)]) { + MemoryDriver.keyScores[this.storeKey(key)] = []; + } + return MemoryDriver.keyScores[this.storeKey(key)].append(val); + } + + async getScoresCount(key: string): Promise { + return MemoryDriver.keyScores[this.storeKey(key)]?.length ?? 0; + } + async getCount(key: string): Promise { + return MemoryDriver.keyCounts[this.storeKey(key)]; + } + + private async get(key: string): Promise { + const value = MemoryDriver.keyCounts[this.storeKey(key)]; + if (!value) return null; + return value; + } + + private async set( + key: string, + value: number, + ttlInSec?: number, + ): Promise { + const redisKey = this.storeKey(key); + MemoryDriver.keyCounts[redisKey] = value; + if (ttlInSec) { + setTimeout(() => { + delete MemoryDriver.keyCounts[redisKey]; + }, ttlInSec * 1000); + } + return true; + } + + private storeKey(key: string): string { + return `${this.options.prefix}:::${key}`; + } +} diff --git a/packages/core/lib/limiter/drivers/redis.ts b/packages/core/lib/limiter/drivers/redis.ts new file mode 100644 index 0000000..1940a8f --- /dev/null +++ b/packages/core/lib/limiter/drivers/redis.ts @@ -0,0 +1,93 @@ +import { RedisDriverOption } from '../../cache'; +import { Package } from '../../utils'; +import { LimiterDriver } from '../interfaces/limiterDriver'; + +export class RedisDriver implements LimiterDriver { + private client: any; + + constructor(private options: RedisDriverOption) { + const IORedis = Package.load('ioredis'); + if (options.url) { + this.client = new IORedis(options.url, { db: options.database || 0 }); + } else { + this.client = new IORedis({ + host: options.host, + port: options.port, + username: options.username, + password: options.password, + db: options.database, + }); + } + } + + async setCounter( + key: string, + value: number, + ttlInSec?: number, + ): Promise { + await this.set(key, value, ttlInSec); + } + + async incrementCounter(key: string): Promise { + await this.client.incr(this.storeKey(key)); + } + + async decrementCounter(key: string): Promise { + if ([undefined, 0].includes(+(await this.get(key)))) { + return false; + } + await this.client.decr(this.storeKey(key)); + return true; + } + + async delScoresLessThan(key: string, val: number): Promise { + let ele = await this.client.lindex(this.storeKey(key), 0); + if (!ele) return; + while (ele && ele < val) { + await this.client.lpop(this.storeKey(key), 0); + ele = await this.client.lindex(this.storeKey(key), 0); + } + } + + async addNewScore(key: string, val: number) { + await this.client.ladd(this.storeKey(key), val); + } + + async getScoresCount(key: string): Promise { + return await this.client.llen(this.storeKey(key)); + } + + async getCount(key: string): Promise { + return +(await this.client.get(key)); + } + + private async get(key: string): Promise { + const value = await this.client.get(this.storeKey(key)); + if (!value) return null; + try { + return JSON.parse(value); + } catch (e) { + return value; + } + } + + private async set( + key: string, + value: string | number | Record, + ttlInSec?: number, + ): Promise { + try { + const redisKey = this.storeKey(key); + ttlInSec + ? await this.client.set(redisKey, JSON.stringify(value), 'EX', ttlInSec) + : await this.client.set(redisKey, JSON.stringify(value)); + return true; + } catch { + return false; + } + } + + private storeKey(key: string): string { + return `${this.options.prefix}:::${key}`; + } +} diff --git a/packages/core/lib/limiter/index.ts b/packages/core/lib/limiter/index.ts new file mode 100644 index 0000000..651635c --- /dev/null +++ b/packages/core/lib/limiter/index.ts @@ -0,0 +1,4 @@ +export * from './rateLimiter'; +export * from './strategies'; +export * from './drivers'; +export * from './decorator'; diff --git a/packages/core/lib/limiter/interfaces/limiterDriver.ts b/packages/core/lib/limiter/interfaces/limiterDriver.ts new file mode 100644 index 0000000..89f934e --- /dev/null +++ b/packages/core/lib/limiter/interfaces/limiterDriver.ts @@ -0,0 +1,9 @@ +export interface LimiterDriver { + setCounter(key: string, value: number, ttlInSec?: number): Promise; + incrementCounter(key: string): Promise; + decrementCounter(key: string): Promise; + delScoresLessThan(key: string, val: number): Promise; + addNewScore(key: string, val: number): void; + getScoresCount(key: string): Promise; + getCount(key: string): Promise; +} diff --git a/packages/core/lib/limiter/interfaces/options.ts b/packages/core/lib/limiter/interfaces/options.ts new file mode 100644 index 0000000..c93523a --- /dev/null +++ b/packages/core/lib/limiter/interfaces/options.ts @@ -0,0 +1,33 @@ +import { MemoryDriver, RedisDriver } from '../drivers'; + +export interface LimiterOptions { + isGlobal?: boolean; + driver: LimiterDriverType; + defaultTokensCount?: number; + defaultRefillIntervalInSeconds?: number; + connection?: RedisConnection; +} + +export interface RedisConnection { + host: string; + port: number; + database: number; + password: string; +} + +export enum LimiterDriverType { + REDIS = 'redis', + IN_MEMORY = 'in-memory', +} + +export const defaultOptions = { + isGlobal: true, + driver: LimiterDriverType.IN_MEMORY, + defaultTokensCount: 40, + defaultRefillIntervalInSeconds: 10, +}; + +export const DriversMap = { + [LimiterDriverType.REDIS]: RedisDriver, + [LimiterDriverType.IN_MEMORY]: MemoryDriver, +}; diff --git a/packages/core/lib/limiter/rateLimiter.ts b/packages/core/lib/limiter/rateLimiter.ts new file mode 100644 index 0000000..d3d53f9 --- /dev/null +++ b/packages/core/lib/limiter/rateLimiter.ts @@ -0,0 +1,41 @@ +import { LimiterDriver } from './interfaces/limiterDriver'; +import { BaseStrategy } from './strategies/baseStrategy'; +import { Injectable } from '@nestjs/common'; +import { ConfigService } from '../config/service'; +import { + DriversMap, + LimiterDriverType, +} from './interfaces/options'; + +@Injectable() +export class Limiter { + private static driver: LimiterDriver; + private static strategy: BaseStrategy; + constructor(private config: ConfigService) { + const options = this.config.get('limiter'); + if(!options) return + switch (options.driver) { + case LimiterDriverType.REDIS: { + Limiter.driver = new DriversMap[LimiterDriverType.REDIS]( + options.connection, + ); + } + default: { + Limiter.driver = new DriversMap[LimiterDriverType.IN_MEMORY](); + } + } + Limiter.strategy = new BaseStrategy(Limiter.driver); + } + + static initializeToken = ( + key: string, + tokensCount: number, + intervalInSeconds: number, + ) => { + Limiter.strategy.initializeToken(key, tokensCount, intervalInSeconds); + }; + + static useToken = (key: string) => { + Limiter.strategy.useToken(key); + }; +} diff --git a/packages/core/lib/limiter/strategies/baseStrategy.ts b/packages/core/lib/limiter/strategies/baseStrategy.ts new file mode 100644 index 0000000..1d108cd --- /dev/null +++ b/packages/core/lib/limiter/strategies/baseStrategy.ts @@ -0,0 +1,37 @@ +import { GenericException } from '../../exceptions'; +import { LimiterDriver } from '../interfaces/limiterDriver'; + +export class BaseStrategy { + protected static tokensQuota = {}; + protected static tokensIntervals = {}; + + constructor(protected driver: LimiterDriver) { + this.driver = driver; + } + + initializeToken = async ( + key: string, + tokensCount: number, + intervalInSeconds: number, + ) => { + BaseStrategy.tokensQuota[key] = tokensCount; + BaseStrategy.tokensIntervals[key] = intervalInSeconds; + await this.driver.setCounter(key, tokensCount, intervalInSeconds); + }; + + useToken = async (key: string) => { + if ((await this.driver.getCount(key)) == undefined) { + await this.initializeToken( + key, + BaseStrategy.tokensQuota[key] - 1, + BaseStrategy.tokensIntervals[key], + ); + return; + } + const cut = await this.driver.decrementCounter(key); + console.log('current Count', await this.driver.getCount(key), cut); + if (!cut) { + throw new GenericException('Cannot be called.'); + } + }; +} diff --git a/packages/core/lib/limiter/strategies/index.ts b/packages/core/lib/limiter/strategies/index.ts new file mode 100644 index 0000000..dbddbe4 --- /dev/null +++ b/packages/core/lib/limiter/strategies/index.ts @@ -0,0 +1,4 @@ +export * from './baseStrategy'; +export * from './slidingWindowCounter'; +export * from './tokenBucket'; +export * from './windowCounter'; diff --git a/packages/core/lib/limiter/strategies/slidingWindowCounter.ts b/packages/core/lib/limiter/strategies/slidingWindowCounter.ts new file mode 100644 index 0000000..3a6b5db --- /dev/null +++ b/packages/core/lib/limiter/strategies/slidingWindowCounter.ts @@ -0,0 +1,35 @@ +import { GenericException } from '../../exceptions'; +import { LimiterDriver } from '../interfaces/limiterDriver'; +import { BaseStrategy } from './baseStrategy'; + +export class SlidingWindowCounter extends BaseStrategy { + constructor(driver: LimiterDriver) { + super(driver); + } + + initializeToken = async ( + key: string, + tokensCount: number, + intervalInSeconds: number, + ) => { + SlidingWindowCounter.tokensQuota[key] = tokensCount; + SlidingWindowCounter.tokensIntervals[key] = intervalInSeconds; + await this.driver.setCounter(key, tokensCount); + }; + + useToken = async (key: string) => { + const current = new Date().valueOf(); + await this.driver.delScoresLessThan(key, current); + + if ( + (await this.driver.getScoresCount(key)) >= BaseStrategy.tokensQuota[key] + ) { + throw new GenericException('Cannot be called.'); + } + await this.driver.addNewScore(key, current); + }; + + setTimer = () => { + // this.setTokens(); + }; +} diff --git a/packages/core/lib/limiter/strategies/tokenBucket.ts b/packages/core/lib/limiter/strategies/tokenBucket.ts new file mode 100644 index 0000000..2a8bcbb --- /dev/null +++ b/packages/core/lib/limiter/strategies/tokenBucket.ts @@ -0,0 +1,43 @@ +import { BaseStrategy } from './baseStrategy'; +import { LimiterDriver } from '../interfaces/limiterDriver'; +import { GenericException } from '../../exceptions'; + +export class TokenBucketStrategy extends BaseStrategy { + constructor(driver: LimiterDriver) { + super(driver); + } + + initializeToken = async ( + key: string, + tokensCount: number, + intervalInSeconds: number, + ) => { + TokenBucketStrategy.tokensQuota[key] = tokensCount; + TokenBucketStrategy.tokensIntervals[key] = intervalInSeconds; + await this.driver.setCounter(key, tokensCount); + }; + + useToken = async (key: string) => { + if (!(await this.driver.decrementCounter(key))) { + throw new GenericException('Cannot be called.'); + } + }; + + incrementTokens = async () => { + for (const key in TokenBucketStrategy.tokensQuota) { + if ( + (await this.driver.getCount(key)) == + TokenBucketStrategy.tokensQuota[key] + ) { + continue; + } + setInterval(async () => { + await this.driver.incrementCounter(key); + }, TokenBucketStrategy.tokensIntervals[key] * 1000); + } + }; + + setTimer = () => { + this.incrementTokens(); + }; +} diff --git a/packages/core/lib/limiter/strategies/windowCounter.ts b/packages/core/lib/limiter/strategies/windowCounter.ts new file mode 100644 index 0000000..7e4fb95 --- /dev/null +++ b/packages/core/lib/limiter/strategies/windowCounter.ts @@ -0,0 +1,43 @@ +import { BaseStrategy } from './baseStrategy'; +import { LimiterDriver } from '../interfaces/limiterDriver'; +import { GenericException } from '../../exceptions'; + +export class WindowCounterStrategy extends BaseStrategy { + constructor(driver: LimiterDriver) { + super(driver); + } + + initializeToken = async ( + key: string, + tokensCount: number, + intervalInSeconds: number, + ) => { + WindowCounterStrategy.tokensQuota[key] = tokensCount; + WindowCounterStrategy.tokensIntervals[key] = intervalInSeconds; + await this.driver.setCounter(key, tokensCount); + }; + + useToken = async (key: string) => { + if (!(await this.driver.decrementCounter(key))) { + throw new GenericException('Cannot be called.'); + } + }; + + resetTokens = async () => { + for (const key in BaseStrategy.tokensQuota) { + if ((await this.driver.getCount(key)) == BaseStrategy.tokensQuota[key]) { + continue; + } + setInterval(async () => { + await this.driver.setCounter( + key, + WindowCounterStrategy.tokensQuota[key], + ); + }, BaseStrategy.tokensIntervals[key] * 1000); + } + }; + + setTimer = () => { + this.resetTokens(); + }; +}