Skip to content

Commit e9de34f

Browse files
ivke995Ante-Koceicdavidabram
authored
rewrote get-merge-requests (#98)
* rewrote get-merge-requests * Added message for every page of MRs * fixed typings * removed unused imports * internal IDs added * publishing events --------- Co-authored-by: Ante Koceić <ante@crocoder.dev> Co-authored-by: David Abram <david@crocoder.dev>
1 parent 25617b2 commit e9de34f

File tree

6 files changed

+89
-64
lines changed

6 files changed

+89
-64
lines changed

apps/extract-stack/src/events.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,18 @@ const metadataSchema = z.object({
1717
userId: z.string(),
1818
});
1919
const extractMergeRequestEventSchema = z.object({
20-
mergeRequests: z.array(MergeRequestSchema),
20+
mergeRequestIds: z.array(MergeRequestSchema.shape.id),
2121
});
2222

23-
export const extractMergeRequestsEvent = {
24-
schemaShape: extractMergeRequestEventSchema.shape,
23+
export type extractMergeRequestsEventMessage = z.infer<typeof extractMergeRequestEventSchema>;
24+
25+
export const extractMergeRequestsEvent = createEvent({
2526
source: "extract",
26-
detailType: "mergeRequest",
27-
};
27+
type: "mergeRequest",
28+
propertiesShape: extractMergeRequestEventSchema.shape,
29+
eventBusName: EventBus.ExtractBus.eventBusName,
30+
metadataShape: metadataSchema.shape,
31+
});
2832

2933
export const extractRepositoryEvent = createEvent({
3034
source: "extract",

apps/extract-stack/src/extract-merge-requests.ts

Lines changed: 59 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,18 @@ import { EventHandler } from "sst/node/event-bus";
66

77
import {
88
getMergeRequests,
9-
getPaginationData,
109
type Context,
1110
type GetMergeRequestsEntities,
1211
type GetMergeRequestsSourceControl,
1312
} from "@acme/extract-functions";
14-
import { mergeRequests } from "@acme/extract-schema";
13+
import { mergeRequests, namespaces, repositories } from "@acme/extract-schema";
1514
import { GitHubSourceControl, GitlabSourceControl } from "@acme/source-control";
1615

17-
import { extractRepositoryEvent } from "./events";
16+
import { extractMergeRequestsEvent, extractRepositoryEvent } from "./events";
1817
import { extractMergeRequestMessage } from "./messages";
18+
import type { extractRepositoryData } from "./messages";
1919
import { QueueHandler } from "./create-message";
20+
import { eq } from "drizzle-orm";
2021

2122
const clerkClient = Clerk({ secretKey: Config.CLERK_SECRET_KEY });
2223
const client = createClient({
@@ -58,38 +59,57 @@ const initSourceControl = async (userId: string, sourceControl: 'github' | 'gitl
5859
}
5960

6061
export const eventHandler = EventHandler(extractRepositoryEvent, async (evt) => {
62+
if (!evt.properties.namespaceId) throw new Error("Missing namespaceId");
63+
64+
const repository = await db.select().from(repositories).where(eq(repositories.id, evt.properties.repositoryId)).get();
65+
const namespace = await db.select().from(namespaces).where(eq(namespaces.id, evt.properties.namespaceId)).get();
66+
67+
if (!repository) throw new Error("invalid repo id");
68+
if (!namespace) throw new Error("Invalid namespace id");
6169

62-
const externalRepositoryId = evt.properties.repository.externalId;
63-
const repositoryName = evt.properties.repository.name;
64-
const namespace = evt.properties.namespace;
65-
const repository = evt.properties.repository;
6670
const sourceControl = evt.metadata.sourceControl;
67-
const repositoryId = evt.properties.repository.id;
6871

6972
context.integrations.sourceControl = await initSourceControl(evt.metadata.userId, sourceControl)
7073

71-
const { paginationInfo } = await getPaginationData(
72-
{
73-
externalRepositoryId: externalRepositoryId,
74-
namespaceName: namespace?.name || "",
75-
repositoryName: repositoryName,
76-
repositoryId: repositoryId,
77-
},
78-
context,
79-
);
74+
const { mergeRequests, paginationInfo } = await getMergeRequests(
75+
{
76+
externalRepositoryId: repository.externalId,
77+
namespaceName: namespace?.name || "",
78+
repositoryName: repository.name,
79+
repositoryId: repository.id,
80+
perPage: 10,
81+
}, context,
82+
);
83+
84+
await extractMergeRequestsEvent.publish({mergeRequestIds: mergeRequests.map(mr => mr.id)}, {
85+
version: 1,
86+
caller: 'extract-merge-requests',
87+
sourceControl,
88+
userId: evt.metadata.userId,
89+
timestamp: new Date().getTime(),
90+
});
91+
92+
const arrayOfExtractMergeRequests: extractRepositoryData[] = [];
93+
for(let i = 2; i <= paginationInfo.totalPages; i++ ) {
94+
arrayOfExtractMergeRequests.push({
95+
repository,
96+
namespace: namespace,
97+
pagination: {
98+
page: i,
99+
perPage: paginationInfo.perPage,
100+
totalPages: paginationInfo.totalPages
101+
}
102+
});
103+
}
104+
105+
await extractMergeRequestMessage.sendAll(arrayOfExtractMergeRequests, {
106+
version: 1,
107+
caller: 'extract-merge-requests',
108+
sourceControl,
109+
userId: evt.metadata.userId,
110+
timestamp: new Date().getTime(),
111+
});
80112

81-
for (let index = 1; index <= paginationInfo.totalPages; index++) {
82-
83-
await extractMergeRequestMessage.send({
84-
repository,
85-
namespace,
86-
pagination: {
87-
page: index,
88-
perPage: paginationInfo.perPage,
89-
totalPages: paginationInfo.totalPages
90-
}
91-
}, { caller: 'extract-merge-requests', timestamp: new Date().getTime(), version: 1, sourceControl, userId: evt.metadata.userId });
92-
}
93113
});
94114

95115
export const queueHandler = QueueHandler(extractMergeRequestMessage, async (message) => {
@@ -103,8 +123,7 @@ export const queueHandler = QueueHandler(extractMergeRequestMessage, async (mess
103123

104124
const {namespace, pagination, repository} = message.content;
105125

106-
// eslint-disable-next-line @typescript-eslint/no-unused-vars
107-
const { mergeRequests } = await getMergeRequests(
126+
const {mergeRequests} = await getMergeRequests(
108127
{
109128
externalRepositoryId: repository.externalId,
110129
namespaceName: namespace?.name || "",
@@ -115,4 +134,13 @@ export const queueHandler = QueueHandler(extractMergeRequestMessage, async (mess
115134
},
116135
context,
117136
);
137+
138+
await extractMergeRequestsEvent.publish({mergeRequestIds: mergeRequests.map(mr => mr.id)}, {
139+
version: 1,
140+
caller: 'extract-merge-requests',
141+
sourceControl: message.metadata.sourceControl,
142+
userId: message.metadata.userId,
143+
timestamp: new Date().getTime(),
144+
});
145+
118146
})

apps/extract-stack/src/messages.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ const extractRepositoryDataSchema = z.object({
1616
pagination: paginationSchema
1717
});
1818

19+
export type extractRepositoryData = z.infer<typeof extractRepositoryDataSchema>;
20+
1921
const metadataSchema = z.object({
2022
version: z.number(),
2123
timestamp: z.number(),
@@ -34,4 +36,4 @@ export const extractMergeRequestMessage = createMessage({
3436
metadataShape: metadataSchema.shape,
3537
contentShape: extractRepositoryDataSchema.shape,
3638
queueUrl: Queue.MRQueue.queueUrl
37-
});
39+
});

apps/extract-stack/src/stack.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@ export function ExtractStack({ stack }: StackContext) {
7070
});
7171

7272
mergeRequestQueue.addConsumer(stack, {
73+
cdk: {
74+
eventSource: {
75+
batchSize: 1,
76+
maxConcurrency: 20,
77+
},
78+
},
7379
function: {
7480
bind: [
7581
bus,
Lines changed: 11 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import type { MergeRequest } from "@acme/extract-schema";
22
import type { ExtractFunction, Entities } from "./config";
33
import type { Pagination, SourceControl } from "@acme/source-control";
4+
import type { LibSQLDatabase } from "drizzle-orm/libsql";
5+
import type { BetterSQLite3Database } from "drizzle-orm/better-sqlite3";
46

57
export type GetMergeRequestsInput = {
68
externalRepositoryId: number;
@@ -25,28 +27,6 @@ export type GetMergeRequestsEntities = Pick<Entities, "mergeRequests">;
2527

2628
export type GetMergeRequestsFunction = ExtractFunction<GetMergeRequestsInput, GetMergeRequestsOutput, GetMergeRequestsSourceControl, GetMergeRequestsEntities>;
2729

28-
export type GetPaginationDataFunction = ExtractFunction<GetMergeRequestsInput, GetPaginationDataOutput, GetMergeRequestsSourceControl, GetMergeRequestsEntities>;
29-
30-
export const getPaginationData: GetPaginationDataFunction = async (
31-
{ externalRepositoryId, namespaceName, repositoryName, repositoryId },
32-
{ integrations },
33-
) => {
34-
if (!integrations.sourceControl) {
35-
throw new Error("Source control integration not configured");
36-
}
37-
38-
const { pagination } = await integrations.sourceControl.fetchMergeRequests(
39-
externalRepositoryId,
40-
namespaceName,
41-
repositoryName,
42-
repositoryId,
43-
{},
44-
);
45-
46-
return {
47-
paginationInfo: pagination,
48-
};
49-
};
5030

5131
export const getMergeRequests: GetMergeRequestsFunction = async (
5232
{ externalRepositoryId, namespaceName, repositoryName, repositoryId, page, perPage},
@@ -59,12 +39,17 @@ export const getMergeRequests: GetMergeRequestsFunction = async (
5939

6040
const { mergeRequests, pagination } = await integrations.sourceControl.fetchMergeRequests(externalRepositoryId, namespaceName, repositoryName, repositoryId, {}, page, perPage);
6141

62-
const insertedMergeRequests = await db.insert(entities.mergeRequests).values(mergeRequests)
63-
.onConflictDoNothing({ target: entities.mergeRequests.externalId }).returning()
64-
.all();
42+
const insertedMergeRequests = await (db as (LibSQLDatabase & BetterSQLite3Database)).transaction(async (tx) => {
43+
return Promise.all(mergeRequests.map(mergeRequest =>
44+
tx.insert(entities.mergeRequests).values(mergeRequest)
45+
.onConflictDoUpdate({ target: entities.mergeRequests.externalId, set: { updatedAt: new Date() } })
46+
.returning()
47+
.get()
48+
));
49+
});
6550

6651
return {
6752
mergeRequests: insertedMergeRequests,
6853
paginationInfo: pagination,
6954
};
70-
};
55+
};

packages/integrations/source-control/src/gitlab/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ export class GitlabSourceControl implements SourceControl {
5757
async fetchMergeRequests(externalRepositoryId: number, namespaceName = '', repositoryName = '', repositoryId: number, creationPeriod: TimePeriod = {}, page?: number, perPage?: number): Promise<{ mergeRequests: NewMergeRequest[], pagination: Pagination }> {
5858
const { data, paginationInfo } = await this.api.MergeRequests.all({
5959
projectId: externalRepositoryId,
60-
page,
60+
page: page || 1,
6161
perPage,
6262
pagination: 'offset',
6363
showExpanded: true,

0 commit comments

Comments
 (0)