Skip to content

Commit 5da8a36

Browse files
committed
reduce pg pressure for dedupe tasks
1 parent 332d4fc commit 5da8a36

File tree

2 files changed

+37
-15
lines changed

2 files changed

+37
-15
lines changed

packages/services/workflows/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
"@hive/service-common": "workspace:*",
1414
"@hive/storage": "workspace:*",
1515
"@sentry/node": "7.120.2",
16+
"bentocache": "1.1.0",
1617
"dotenv": "16.4.7",
1718
"graphile-worker": "0.16.6",
1819
"nodemailer": "7.0.11",

packages/services/workflows/src/kit.ts

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { BentoCache, bentostore } from 'bentocache';
2+
import { memoryDriver } from 'bentocache/build/src/drivers/memory';
13
import { makeWorkerUtils, WorkerUtils, type JobHelpers, type Task } from 'graphile-worker';
24
import type { Pool } from 'pg';
35
import { z } from 'zod';
@@ -71,6 +73,8 @@ export function implementTask<TPayload>(
7173
*/
7274
export class TaskScheduler {
7375
tools: Promise<WorkerUtils>;
76+
cache: BentoCache<{ store: ReturnType<typeof bentostore> }>;
77+
7478
constructor(
7579
pgPool: Pool,
7680
private logger: Logger = new Logger(),
@@ -79,6 +83,17 @@ export class TaskScheduler {
7983
pgPool,
8084
logger: bridgeGraphileLogger(logger),
8185
});
86+
this.cache = new BentoCache({
87+
default: 'taskSchedule',
88+
stores: {
89+
taskSchedule: bentostore().useL1Layer(
90+
memoryDriver({
91+
maxItems: 10_000,
92+
prefix: 'bentocache:graphile_worker_deduplication',
93+
}),
94+
),
95+
},
96+
});
8297
}
8398

8499
async scheduleTask<TPayload>(
@@ -111,21 +126,27 @@ export class TaskScheduler {
111126
typeof opts.dedupe.key === 'string' ? opts.dedupe.key : opts.dedupe.key(payload);
112127
const expiresAt = new Date(new Date().getTime() + opts.dedupe.ttl).toISOString();
113128

114-
const shouldSkip = await tools.withPgClient(async client => {
115-
const result = await client.query(
116-
`
117-
INSERT INTO "graphile_worker_deduplication" ("task_name", "dedupe_key", "expires_at")
118-
VALUES($1, $2, $3)
119-
ON CONFLICT ("task_name", "dedupe_key")
120-
DO
121-
UPDATE SET "expires_at" = EXCLUDED.expires_at
122-
WHERE "graphile_worker_deduplication"."expires_at" < NOW()
123-
RETURNING xmax = 0 AS "inserted"
124-
`,
125-
[taskDefinition.name, dedupeKey, expiresAt],
126-
);
127-
128-
return result.rows.length === 0;
129+
const shouldSkip = await this.cache.getOrSet({
130+
key: `${taskDefinition.name}:${dedupeKey}`,
131+
ttl: opts.dedupe.ttl,
132+
async factory() {
133+
return await tools.withPgClient(async client => {
134+
const result = await client.query(
135+
`
136+
INSERT INTO "graphile_worker_deduplication" ("task_name", "dedupe_key", "expires_at")
137+
VALUES($1, $2, $3)
138+
ON CONFLICT ("task_name", "dedupe_key")
139+
DO
140+
UPDATE SET "expires_at" = EXCLUDED.expires_at
141+
WHERE "graphile_worker_deduplication"."expires_at" < NOW()
142+
RETURNING xmax = 0 AS "inserted"
143+
`,
144+
[taskDefinition.name, dedupeKey, expiresAt],
145+
);
146+
147+
return result.rows.length === 0;
148+
});
149+
},
129150
});
130151

131152
if (shouldSkip) {

0 commit comments

Comments
 (0)