Skip to content

Commit 43524f0

Browse files
committed
schema purge
1 parent 222bfb9 commit 43524f0

File tree

10 files changed

+79
-211
lines changed

10 files changed

+79
-211
lines changed

packages/services/api/src/index.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ export type {
1818
OrganizationBilling,
1919
OrganizationInvitation,
2020
} from './shared/entities';
21-
export { createTaskRunner } from './modules/shared/lib/task-runner';
2221
export { minifySchema } from './shared/schema';
2322
export { HiveError } from './shared/errors';
2423
export { ProjectType } from './__generated__/types';

packages/services/api/src/modules/shared/lib/task-runner.ts

Lines changed: 0 additions & 77 deletions
This file was deleted.

packages/services/server/src/index.ts

Lines changed: 0 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import { z } from 'zod';
1616
import formDataPlugin from '@fastify/formbody';
1717
import {
1818
createRegistry,
19-
createTaskRunner,
2019
CryptoProvider,
2120
LogFn,
2221
Logger,
@@ -39,7 +38,6 @@ import {
3938
registerTRPC,
4039
reportReadiness,
4140
startMetrics,
42-
traceInline,
4341
TracingInstance,
4442
} from '@hive/service-common';
4543
import { createConnectionString, createStorage as createPostgreSQLStorage } from '@hive/storage';
@@ -209,50 +207,6 @@ export async function main() {
209207
}),
210208
}) as HivePubSub;
211209

212-
let dbPurgeTaskRunner: null | ReturnType<typeof createTaskRunner> = null;
213-
214-
if (!env.hiveServices.commerce) {
215-
server.log.debug('Commerce service is disabled. Skip scheduling purge tasks.');
216-
} else {
217-
server.log.debug(
218-
`Commerce service is enabled. Start scheduling purge tasks every ${env.hiveServices.commerce.dateRetentionPurgeIntervalMinutes} minutes.`,
219-
);
220-
dbPurgeTaskRunner = createTaskRunner({
221-
run: traceInline(
222-
'Purge Task',
223-
{
224-
resultAttributes: result => ({
225-
'purge.schema.check.count': result.deletedSchemaCheckCount,
226-
'purge.sdl.store.count': result.deletedSdlStoreCount,
227-
'purge.change.approval.count': result.deletedSchemaChangeApprovalCount,
228-
'purge.contract.approval.count': result.deletedContractSchemaChangeApprovalCount,
229-
}),
230-
},
231-
async () => {
232-
try {
233-
const result = await storage.purgeExpiredSchemaChecks({
234-
expiresAt: new Date(),
235-
});
236-
server.log.debug(
237-
'Finished running schema check purge task. (deletedSchemaCheckCount=%s deletedSdlStoreCount=%s)',
238-
result.deletedSchemaCheckCount,
239-
result.deletedSdlStoreCount,
240-
);
241-
242-
return result;
243-
} catch (error) {
244-
captureException(error);
245-
throw error;
246-
}
247-
},
248-
),
249-
interval: env.hiveServices.commerce.dateRetentionPurgeIntervalMinutes * 60 * 1000,
250-
logger: server.log,
251-
});
252-
253-
dbPurgeTaskRunner.start();
254-
}
255-
256210
registerShutdown({
257211
logger: server.log,
258212
async onShutdown() {
@@ -262,10 +216,6 @@ export async function main() {
262216
await server.close();
263217
server.log.info('Stopping Storage handler...');
264218
await storage.destroy();
265-
if (dbPurgeTaskRunner) {
266-
server.log.info('Stopping expired schema check purge task...');
267-
await dbPurgeTaskRunner.stop();
268-
}
269219
server.log.info('Shutdown complete.');
270220
},
271221
});

packages/services/workflows/src/environment.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import zod from 'zod';
22
import { OpenTelemetryConfigurationModel } from '@hive/service-common';
33
import { createConnectionString } from '@hive/storage';
4+
import { RequestBroker } from './lib/webhooks/schema-change-notification';
45

56
const isNumberString = (input: unknown) => zod.string().regex(/^\d+$/).safeParse(input).success;
67

@@ -81,6 +82,17 @@ const EmailProviderModel = zod.union([
8182
SendmailEmailModel,
8283
]);
8384

85+
const RequestBrokerModel = zod.union([
86+
zod.object({
87+
REQUEST_BROKER: emptyString(zod.literal('0').optional()),
88+
}),
89+
zod.object({
90+
REQUEST_BROKER: zod.literal('1'),
91+
REQUEST_BROKER_ENDPOINT: zod.string().min(1),
92+
REQUEST_BROKER_SIGNATURE: zod.string().min(1),
93+
}),
94+
]);
95+
8496
const PrometheusModel = zod.object({
8597
PROMETHEUS_METRICS: emptyString(zod.union([zod.literal('0'), zod.literal('1')]).optional()),
8698
PROMETHEUS_METRICS_LABEL_INSTANCE: emptyString(zod.string().optional()),
@@ -112,6 +124,7 @@ const configs = {
112124
prometheus: PrometheusModel.safeParse(process.env),
113125
log: LogModel.safeParse(process.env),
114126
tracing: OpenTelemetryConfigurationModel.safeParse(process.env),
127+
requestBroker: RequestBrokerModel.safeParse(process.env),
115128
};
116129

117130
const environmentErrors: Array<string> = [];
@@ -142,6 +155,7 @@ const sentry = extractConfig(configs.sentry);
142155
const prometheus = extractConfig(configs.prometheus);
143156
const log = extractConfig(configs.log);
144157
const tracing = extractConfig(configs.tracing);
158+
const requestBroker = extractConfig(configs.requestBroker);
145159

146160
const emailProviderConfig =
147161
email.EMAIL_PROVIDER === 'postmark'
@@ -211,4 +225,11 @@ export const env = {
211225
user: postgres.POSTGRES_USER,
212226
}),
213227
},
228+
requestBroker:
229+
requestBroker.REQUEST_BROKER === '1'
230+
? ({
231+
endpoint: requestBroker.REQUEST_BROKER_ENDPOINT,
232+
signature: requestBroker.REQUEST_BROKER_SIGNATURE,
233+
} satisfies RequestBroker)
234+
: null,
214235
} as const;

packages/services/workflows/src/index.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@ const pg = await createPool(env.postgres.connectionString);
1515
const modules = await Promise.all([
1616
import('./tasks/audit-log-export.js'),
1717
import('./tasks/email-verification.js'),
18-
import('./tasks/organization-invite.js'),
18+
import('./tasks/organization-invitation.js'),
1919
import('./tasks/organization-ownership-transfer.js'),
2020
import('./tasks/password-reset.js'),
21+
import('./tasks/purge-expired-schema-checks.js'),
2122
import('./tasks/schema-change-notification.js'),
2223
import('./tasks/usage-rate-limit-exceeded.js'),
2324
import('./tasks/usage-rate-limit-warning.js'),
@@ -29,6 +30,7 @@ const context: Context = {
2930
logger,
3031
email: createEmailProvider(env.email.provider, env.email.emailFrom),
3132
pg,
33+
requestBroker: env.requestBroker,
3234
};
3335

3436
function logLevel(level: GraphileLogLevel) {
@@ -52,8 +54,10 @@ let runner: Runner = await run({
5254
logger: new GraphileLogger(_scope => (level, message, _meta) => {
5355
logger[logLevel(level)](message);
5456
}),
55-
// TODO: define cron jobs!
56-
crontab: ' ',
57+
crontab: `
58+
# Purge expired schema checks every Saturday at 10:00AM
59+
0 10 * * 0 purgeExpiredSchemaChecks
60+
`,
5761
connectionString: env.postgres.connectionString,
5862
taskList: Object.fromEntries(modules.map(module => module.task(context))),
5963
});
Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,13 @@
1-
import { JobHelpers, Task } from 'graphile-worker';
1+
import type { JobHelpers, Task } from 'graphile-worker';
22
import { z } from 'zod';
3-
import { Logger } from '@graphql-hive/logger';
4-
import { Context } from './context';
3+
import type { Logger } from '@graphql-hive/logger';
4+
import type { Context } from './context';
55

66
export type TaskDefinition<TName extends string, TModel> = {
77
name: TName;
88
schema: z.ZodTypeAny & { _output: TModel };
99
};
1010

11-
export function defineTask<TName extends string, TModel>(
12-
workflow: TaskDefinition<TName, TModel>,
13-
): TaskDefinition<TName, TModel> {
14-
return workflow;
15-
}
16-
1711
type TaskImplementationArgs<TPayload> = {
1812
input: TPayload;
1913
context: Context;
@@ -25,31 +19,52 @@ export type TaskImplementation<TPayload> = (
2519
args: TaskImplementationArgs<TPayload>,
2620
) => Promise<void>;
2721

22+
/**
23+
* Define a task
24+
*/
25+
export function defineTask<TName extends string, TModel>(
26+
workflow: TaskDefinition<TName, TModel>,
27+
): TaskDefinition<TName, TModel> {
28+
return workflow;
29+
}
30+
31+
/**
32+
* Implement a task.
33+
*/
2834
export function implementTask<TPayload>(
2935
taskDefinition: TaskDefinition<string, TPayload>,
3036
implementation: TaskImplementation<TPayload>,
3137
): (context: Context) => [string, Task] {
38+
const schema = z.object({
39+
requestId: z.string().optional(),
40+
input: taskDefinition.schema,
41+
});
42+
3243
return function (context) {
3344
return [
3445
taskDefinition.name,
3546
function (unsafePayload, helpers) {
36-
const input = taskDefinition.schema.parse(unsafePayload);
47+
const payload = schema.parse(unsafePayload);
3748
return implementation({
38-
input,
49+
input: payload.input,
3950
context,
4051
helpers,
4152
logger: context.logger.child({
42-
attrs: {
43-
'job.id': helpers.job.id,
44-
'job.queueId': helpers.job.job_queue_id,
45-
'job.attempts': helpers.job.attempts,
46-
'job.maxAttempts': helpers.job.max_attempts,
47-
'job.priority': helpers.job.priority,
48-
'job.taskId': helpers.job.task_id,
49-
},
53+
'request.id': payload.requestId,
54+
'job.id': helpers.job.id,
55+
'job.queueId': helpers.job.job_queue_id,
56+
'job.attempts': helpers.job.attempts,
57+
'job.maxAttempts': helpers.job.max_attempts,
58+
'job.priority': helpers.job.priority,
59+
'job.taskId': helpers.job.task_id,
5060
}),
5161
});
5262
},
5363
];
5464
};
5565
}
66+
67+
/**
68+
* Schedule a task.
69+
*/
70+
export function scheduleTask<TPayload>(taskDefinition: TaskDefinition<string, TPayload>) {}

packages/services/workflows/src/tasks/audit-log-export.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { defineTask, implementTask } from '../kit.js';
33
import { renderAuditLogsReportEmail } from '../lib/emails/templates/audit-logs-report.js';
44

55
export const AuditLogExportTask = defineTask({
6-
name: 'audit-log-export',
6+
name: 'auditLogExport',
77
schema: z.object({
88
organizationId: z.string(),
99
organizationName: z.string(),

packages/services/workflows/src/tasks/organization-invite.ts

Lines changed: 0 additions & 25 deletions
This file was deleted.

packages/services/workflows/src/tasks/purge-expired-schema-checks.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,14 @@ import { purgeExpiredSchemaChecks } from '../lib/expired-schema-checks';
44

55
export const PurgeExpiredSchemaChecks = defineTask({
66
name: 'purgeExpiredSchemaChecks',
7-
schema: z.undefined(),
7+
schema: z.unknown(),
88
});
99

1010
export const task = implementTask(PurgeExpiredSchemaChecks, async args => {
11-
await purgeExpiredSchemaChecks({ pool: args.context.pg, expiresAt: new Date() });
11+
args.logger.debug('purging expired schema checks');
12+
const statistics = await purgeExpiredSchemaChecks({
13+
pool: args.context.pg,
14+
expiresAt: new Date(),
15+
});
16+
args.logger.debug({ statistics }, 'finished purging schema checks');
1217
});

0 commit comments

Comments
 (0)