Skip to content

Commit 1b5a9ce

Browse files
chore: more frequent etls (#394)
* don't publish empty merge-request extract events * smaller * oopsie
1 parent 69707da commit 1b5a9ce

File tree

6 files changed

+53
-32
lines changed

6 files changed

+53
-32
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
export const timePeriodOf = (timestamp: number, periodDuration: number, periodStartMargin: number, periodLatency: number) => {
2+
const to = timestamp - (timestamp % periodDuration);
3+
const from = to - (periodDuration + periodStartMargin);
4+
5+
return {
6+
from: new Date(from - periodLatency),
7+
to: new Date(to - periodLatency)
8+
}
9+
}

apps/stack/src/extract/extract-merge-requests.ts

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ export const mergeRequestSenderHandler = createMessageHandler({
6464
{ ...context, db: getTenantDb(message.metadata.tenantId) },
6565
);
6666

67+
if (mergeRequests.length === 0) return;
68+
6769
await extractMergeRequestsEvent.publish(
6870
{
6971
mergeRequestIds: mergeRequests.map((mr) => mr.id),
@@ -154,6 +156,8 @@ export const eventHandler = EventHandler(
154156
{ ...context, db },
155157
);
156158

159+
if (mergeRequests.length === 0 && (paginationInfo.totalPages - paginationInfo.page) === 0) return;
160+
157161
await insertEvent(
158162
{
159163
crawlId: ev.metadata.crawlId,
@@ -166,24 +170,26 @@ export const eventHandler = EventHandler(
166170
{ db, entities: { events } },
167171
);
168172

169-
await extractMergeRequestsEvent.publish(
170-
{
171-
mergeRequestIds: mergeRequests.map((mr) => mr.id),
172-
namespaceId: namespace.id,
173-
repositoryId: repository.id,
174-
},
175-
{
176-
crawlId: ev.metadata.crawlId,
177-
version: 1,
178-
caller: "extract-merge-requests",
179-
sourceControl,
180-
userId: ev.metadata.userId,
181-
timestamp: new Date().getTime(),
182-
from: ev.metadata.from,
183-
to: ev.metadata.to,
184-
tenantId: ev.metadata.tenantId,
185-
},
186-
);
173+
if (mergeRequests.length !== 0) {
174+
await extractMergeRequestsEvent.publish(
175+
{
176+
mergeRequestIds: mergeRequests.map((mr) => mr.id),
177+
namespaceId: namespace.id,
178+
repositoryId: repository.id,
179+
},
180+
{
181+
crawlId: ev.metadata.crawlId,
182+
version: 1,
183+
caller: "extract-merge-requests",
184+
sourceControl,
185+
userId: ev.metadata.userId,
186+
timestamp: new Date().getTime(),
187+
from: ev.metadata.from,
188+
to: ev.metadata.to,
189+
tenantId: ev.metadata.tenantId,
190+
},
191+
);
192+
}
187193

188194
const arrayOfExtractMergeRequests = [];
189195
for (let i = paginationInfo.page + 1; i <= paginationInfo.totalPages; i++) {

apps/stack/src/extract/extract-tenants.ts

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { namespaces, repositories } from "@acme/extract-schema";
77
import { eq } from "drizzle-orm";
88
import { repositorySenderHandler } from "./extract-repository";
99
import { ApiHandler, useJsonBody } from "sst/node/api";
10+
import { timePeriodOf } from "@stack/config/time-period";
1011

1112
export const tenantSenderHandler = createMessageHandler({
1213
queueId: 'ExtractQueue',
@@ -62,18 +63,20 @@ export const cronHandler = async ()=> {
6263
const tenants = getTenants();
6364
const tenantIds = tenants.map(tenant => ({ tenantId: tenant.id }));
6465

65-
const utcTodayAt10AM = new Date();
66-
utcTodayAt10AM.setUTCHours(10, 0, 0, 0);
67-
const utcYesterdayAt10AM = new Date(utcTodayAt10AM);
68-
utcYesterdayAt10AM.setHours(utcTodayAt10AM.getUTCHours() - 24);
66+
const PERIOD_DURATION = 15 * 60 * 1000; // 15 minutes
67+
const PERIOD_START_MARGIN = 5 * 60 * 1000; // 5 minutes
68+
const PERIOD_LATENCY = 8 * 60 * 1000; // extract delay
69+
const { from, to } = timePeriodOf(Date.now(), PERIOD_DURATION, PERIOD_START_MARGIN, -PERIOD_LATENCY);
6970

71+
console.log(`CRON-EXTRACT:`, { from, to, now: new Date() })
72+
7073
await sender.sendAll(tenantIds, {
7174
version: 1,
7275
caller: 'extract-tenant:cronHandler',
7376
timestamp: Date.now(),
7477
userId: CRON_USER_ID,
75-
from: utcYesterdayAt10AM,
76-
to: utcTodayAt10AM,
78+
from,
79+
to,
7780
tenantId: -1, // -1 means no db access ?
7881
});
7982

apps/stack/src/transform/transform-tenant.ts

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { createMessageHandler } from "@stack/config/create-message";
77
import { getTenantDb } from "@stack/config/get-tenant-db";
88
import { and, eq, gt, lt } from "drizzle-orm";
99
import { timelineSenderHandler } from "./transform-timeline";
10+
import { timePeriodOf } from "@stack/config/time-period";
1011

1112
export const tenantSenderHandler = createMessageHandler({
1213
queueId: 'TransformQueue',
@@ -58,17 +59,19 @@ const { sender } = tenantSenderHandler;
5859
export const cronHandler = async () => {
5960
const tenants = getTenants().map(tenant => ({ tenantId: tenant.id }));
6061

61-
const utcTodayAt10AM = new Date();
62-
utcTodayAt10AM.setUTCHours(10, 0, 0, 0);
63-
const utcYesterdayAt10AM = new Date(utcTodayAt10AM);
64-
utcYesterdayAt10AM.setHours(utcTodayAt10AM.getUTCHours() - 24);
62+
const PERIOD_DURATION = 15 * 60 * 1000; // 15 minutes
63+
const PERIOD_START_MARGIN = 5 * 60 * 1000; // 5 minutes
64+
const PERIOD_LATENCY = (15 - 8) * 60 * 1000; // extract delay
65+
const { from, to } = timePeriodOf(Date.now(), PERIOD_DURATION, PERIOD_START_MARGIN, PERIOD_LATENCY);
66+
67+
console.log(`CRON-TRANSFORM:`, { from, to, now: new Date() })
6568

6669
await sender.sendAll(tenants, {
6770
version: 1,
6871
caller: 'transform-tenants:cronHandler',
6972
timestamp: Date.now(),
70-
from: utcYesterdayAt10AM,
71-
to: utcTodayAt10AM,
73+
from,
74+
to,
7275
tenantId: -1,
7376
});
7477
}

apps/stack/stacks/ExtractStack.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ export function ExtractStack({ stack }: StackContext) {
195195

196196
if (ENV.CRON_DISABLED !== 'true') {
197197
new Cron(stack, "ExtractCron", {
198-
schedule: "cron(00 10 * * ? *)",
198+
schedule: "cron(8/15 * * * ? *)",
199199
job: {
200200
function: {
201201
handler: "src/extract/extract-tenants.cronHandler",

apps/stack/stacks/TransformStack.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ export function TransformStack({ stack }: StackContext) {
7070

7171
if (ENV.CRON_DISABLED !== 'true') {
7272
new Cron(stack, "TransformCron", {
73-
schedule: "cron(00 13 * * ? *)",
73+
schedule: "cron(0/15 * * * ? *)",
7474
job: {
7575
function: {
7676
handler: "src/transform/transform-tenant.cronHandler",

0 commit comments

Comments
 (0)