Skip to content

Commit c8ba49d

Browse files
Extract Project Members (#77)
* feat: add fetch members to gitlab source control * implement fetch members for github integration * add event-bus trigger to handler * getMembers function * fix wrong signature in interface * get-members function, implemented deki wizardry * fix integration to use satisfies keyword, so i don't have to debug bundled libraries * move around sst types, because why not * add extracting of members 1 page 2 members, for testing purposes... * remove debug log * testing * workable stage * fix: linting * empty commit
1 parent 84e52d2 commit c8ba49d

File tree

14 files changed

+466
-37
lines changed

14 files changed

+466
-37
lines changed

apps/extract-stack/.sst/types/index.ts

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,6 @@ declare module "sst/node/config" {
44
APP: string;
55
STAGE: string;
66
}
7-
}import "sst/node/event-bus";
8-
declare module "sst/node/event-bus" {
9-
export interface EventBusResources {
10-
"ExtractBus": {
11-
eventBusName: string;
12-
}
13-
}
14-
}import "sst/node/queue";
15-
declare module "sst/node/queue" {
16-
export interface QueueResources {
17-
"MRQueue": {
18-
queueUrl: string;
19-
}
20-
}
217
}import "sst/node/config";
228
declare module "sst/node/config" {
239
export interface SecretResources {
@@ -39,6 +25,27 @@ declare module "sst/node/config" {
3925
value: string;
4026
}
4127
}
28+
}import "sst/node/event-bus";
29+
declare module "sst/node/event-bus" {
30+
export interface EventBusResources {
31+
"ExtractBus": {
32+
eventBusName: string;
33+
}
34+
}
35+
}import "sst/node/queue";
36+
declare module "sst/node/queue" {
37+
export interface QueueResources {
38+
"ExtractMemberPageQueue": {
39+
queueUrl: string;
40+
}
41+
}
42+
}import "sst/node/queue";
43+
declare module "sst/node/queue" {
44+
export interface QueueResources {
45+
"MRQueue": {
46+
queueUrl: string;
47+
}
48+
}
4249
}import "sst/node/api";
4350
declare module "sst/node/api" {
4451
export interface ApiResources {

apps/extract-stack/src/create-message.ts

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,25 @@ import AWS from "aws-sdk";
22
import { z } from "zod";
33
import type { ZodRawShape, ZodAny, ZodObject } from "zod";
44
import { nanoid } from "nanoid";
5+
import type { SQSEvent } from "aws-lambda";
56

67
const sqs = new AWS.SQS();
78

89
type Content<Shape extends ZodRawShape> = z.infer<ZodObject<Shape, "strip", ZodAny>>
910

1011
type Send<Shape extends ZodRawShape, MetadataShape extends ZodRawShape> = (
11-
content: Content<Shape>,
12-
metadata: MetadataShape
12+
content: Content<Shape>,
13+
metadata: Content<MetadataShape>
1314
) => Promise<void>;
1415

1516
type BatchSend<Shape extends ZodRawShape, MetadataShape extends ZodRawShape> = (
1617
content: Content<Shape>[],
17-
metadata: MetadataShape
18+
metadata: Content<MetadataShape>
1819
) => Promise<void>;
1920

2021
type MessageProps<QueueUrl extends string, Shape extends ZodRawShape, MetadataShape extends ZodRawShape> = {
2122
queueUrl: QueueUrl;
22-
contentShape: Content<Shape>;
23+
contentShape: Shape;
2324
metadataShape: MetadataShape;
2425
};
2526

@@ -46,7 +47,7 @@ export function createBatchMessage<QueueUrl extends string, Shape extends ZodRaw
4647
}
4748

4849
return {
49-
send
50+
send,
5051
}
5152
}
5253

@@ -55,7 +56,7 @@ export function createMessage<QueueUrl extends string, Shape extends ZodRawShape
5556
contentShape,
5657
metadataShape,
5758
}: MessageProps<QueueUrl, Shape, MetadataShape>) {
58-
59+
5960
const messageSchema = z.object({
6061
content: z.object(contentShape),
6162
metadata: z.object(metadataShape),
@@ -70,6 +71,35 @@ export function createMessage<QueueUrl extends string, Shape extends ZodRawShape
7071
}
7172

7273
return {
73-
send
74+
send,
7475
}
7576
}
77+
78+
type Sender<Shape extends ZodRawShape, MetadataShape extends ZodRawShape> = {
79+
send: Send<Shape, MetadataShape>;
80+
}
81+
82+
type BatchSender<Shape extends ZodRawShape, MetadataShape extends ZodRawShape> = {
83+
send: BatchSend<Shape, MetadataShape>
84+
}
85+
86+
type MessagePayload<Shape extends ZodRawShape, MetadataShape extends ZodRawShape> = {
87+
content: Content<Shape>;
88+
metadata: Content<MetadataShape>;
89+
}
90+
91+
export function QueueHandler<Shape extends ZodRawShape, MetadataShape extends ZodRawShape>(
92+
_sender: Sender<Shape, MetadataShape> | BatchSender<Shape, MetadataShape>,
93+
cb: (
94+
evt: MessagePayload<Shape, MetadataShape>[]
95+
) => Promise<void>
96+
) {
97+
/**
98+
* TODO:
99+
* - Do consumers always recieve batches ?
100+
* - How handle processing failures ?
101+
*/
102+
return async (event: SQSEvent) => {
103+
await cb(event.Records.map((record) => JSON.parse(record.body) as MessagePayload<Shape, MetadataShape>))
104+
}
105+
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
import { EventHandler } from "sst/node/event-bus";
2+
import { extractRepositoryEvent } from "./events";
3+
import { Clerk } from "@clerk/clerk-sdk-node";
4+
import { createClient } from "@libsql/client";
5+
import { drizzle } from "drizzle-orm/libsql";
6+
import { getMembers } from "@acme/extract-functions";
7+
import type { Context, GetMembersEntities, GetMembersSourceControl } from "@acme/extract-functions";
8+
import { members, repositoriesToMembers } from "@acme/extract-schema";
9+
import type { Namespace, Repository } from "@acme/extract-schema";
10+
import { GitHubSourceControl, GitlabSourceControl } from "@acme/source-control";
11+
import type { Pagination } from "@acme/source-control";
12+
import { Config } from "sst/node/config";
13+
import { extractMemberPageBatchMessage } from "./messages";
14+
15+
import { QueueHandler } from "./create-message";
16+
17+
const clerkClient = Clerk({ secretKey: Config.CLERK_SECRET_KEY });
18+
const client = createClient({ url: Config.DATABASE_URL, authToken: Config.DATABASE_AUTH_TOKEN });
19+
20+
const fetchSourceControlAccessToken = async (userId: string, forgeryIdProvider: 'oauth_github' | 'oauth_gitlab') => {
21+
const [userOauthAccessTokenPayload, ...rest] = await clerkClient.users.getUserOauthAccessToken(userId, forgeryIdProvider);
22+
if (!userOauthAccessTokenPayload) throw new Error("Failed to get token");
23+
if (rest.length !== 0) throw new Error("wtf ?");
24+
25+
return userOauthAccessTokenPayload.token;
26+
}
27+
28+
const initSourceControl = async (userId: string, sourceControl: 'github' | 'gitlab') => {
29+
const accessToken = await fetchSourceControlAccessToken(userId, `oauth_${sourceControl}`);
30+
if (sourceControl === 'github') return new GitHubSourceControl(accessToken);
31+
if (sourceControl === 'gitlab') return new GitlabSourceControl(accessToken);
32+
return null;
33+
}
34+
35+
const db = drizzle(client);
36+
37+
const context: Context<GetMembersSourceControl, GetMembersEntities> = {
38+
entities: {
39+
members,
40+
repositoriesToMembers
41+
},
42+
integrations: {
43+
sourceControl: null,
44+
},
45+
db,
46+
};
47+
48+
type ExtractMembersPageInput = {
49+
namespace: Namespace | null;
50+
repository: Repository;
51+
sourceControl: "github" | "gitlab";
52+
userId: string;
53+
paginationInfo: Pagination | null;
54+
}
55+
56+
const extractMembersPage = async ({ namespace, repository, sourceControl, userId, paginationInfo }: ExtractMembersPageInput) => {
57+
const page = paginationInfo?.page;
58+
const perPage = paginationInfo?.perPage;
59+
60+
try {
61+
context.integrations.sourceControl = await initSourceControl(userId, sourceControl);
62+
} catch (error) {
63+
console.error(error);
64+
return;
65+
}
66+
67+
console.log('processing page', paginationInfo?.page, 'perPage', paginationInfo?.perPage);
68+
69+
const { paginationInfo: resultPaginationInfo } = await getMembers({
70+
externalRepositoryId: repository.externalId,
71+
namespaceName: namespace?.name || "",
72+
repositoryId: repository.id,
73+
repositoryName: repository.name,
74+
perPage: page,
75+
page: perPage
76+
}, context);
77+
78+
return resultPaginationInfo;
79+
};
80+
81+
const range = (a: number, b: number) => Array.apply(0, { length: b - a + 1 } as number[]).map((_, index) => index + a);
82+
const chunks = <T>(array: Array<T>, size: number) => Array.apply(0, { length: Math.ceil(array.length / size) } as unknown[]).map((_, index) => array.slice(index * size, (index + 1) * size));
83+
84+
export const eventHandler = EventHandler(extractRepositoryEvent, async (ev) => {
85+
86+
const pagination = await extractMembersPage({
87+
namespace: ev.properties.namespace,
88+
repository: ev.properties.repository,
89+
sourceControl: ev.metadata.sourceControl,
90+
userId: ev.metadata.userId,
91+
paginationInfo: { page: 1, perPage: 2, totalPages: 1000 },
92+
});
93+
94+
if (!pagination) return;
95+
96+
const remainingMemberPages = range(2, pagination.totalPages)
97+
.map(page => ({
98+
page,
99+
perPage: pagination.perPage,
100+
totalPages: pagination.totalPages
101+
} satisfies Pagination));
102+
103+
const batchedPages = chunks(remainingMemberPages, 10);
104+
105+
await Promise.all(batchedPages.map(batch => extractMemberPageBatchMessage.send(
106+
batch.map(page => ({
107+
namespace: ev.properties.namespace,
108+
repository: ev.properties.repository,
109+
pagination: page
110+
})), {
111+
version: 1,
112+
caller: 'extract-member',
113+
sourceControl: ev.metadata.sourceControl,
114+
userId: ev.metadata.userId,
115+
timestamp: new Date().getTime(),
116+
})));
117+
});
118+
119+
export const queueHandler = QueueHandler(extractMemberPageBatchMessage, async (messages) => {
120+
console.log('CONSUMER BARRIER',messages.length)
121+
// TODO: partial retries ?
122+
await Promise.all(messages.map(message => extractMembersPage({
123+
namespace: message.content.namespace,
124+
paginationInfo: message.content.pagination,
125+
repository: message.content.repository,
126+
sourceControl: message.metadata.sourceControl,
127+
userId: message.metadata.userId
128+
})))
129+
})

apps/extract-stack/src/messages.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import { z } from "zod";
2+
import { RepositorySchema } from "@acme/extract-schema";
3+
import { NamespaceSchema } from "@acme/extract-schema/src/namespaces";
4+
import { createBatchMessage } from "./create-message";
5+
import { Queue } from 'sst/node/queue'
6+
7+
const paginationSchema = z.object({
8+
page: z.number(),
9+
perPage: z.number(),
10+
totalPages: z.number(),
11+
});
12+
13+
const extractMemberPageMessageSchema = z.object({
14+
repository: RepositorySchema,
15+
namespace: z.nullable(NamespaceSchema),
16+
pagination: paginationSchema
17+
});
18+
19+
const metadataSchema = z.object({
20+
version: z.number(),
21+
timestamp: z.number(),
22+
caller: z.string(),
23+
sourceControl: z.literal("github").or(z.literal("gitlab")),
24+
userId: z.string(),
25+
});
26+
27+
export const extractMemberPageBatchMessage = createBatchMessage({
28+
metadataShape: metadataSchema.shape,
29+
contentShape: extractMemberPageMessageSchema.shape,
30+
queueUrl: Queue.ExtractMemberPageQueue.queueUrl
31+
});

apps/extract-stack/src/stack.ts

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,49 @@ import { Config } from "sst/constructs";
44
import { z } from "zod";
55

66
export function ExtractStack({ stack }: StackContext) {
7+
const DATABASE_URL = new Config.Secret(stack, "DATABASE_URL");
8+
const DATABASE_AUTH_TOKEN = new Config.Secret(stack, "DATABASE_AUTH_TOKEN");
9+
const CLERK_SECRET_KEY = new Config.Secret(stack, "CLERK_SECRET_KEY");
710

811
const bus = new EventBus(stack, "ExtractBus", {
12+
rules: {
13+
extractRepository: {
14+
pattern: {
15+
source: ["extract"],
16+
detailType: ["repository"]
17+
}
18+
}
19+
},
920
defaults: {
1021
retries: 10,
1122
function: {
12-
runtime: "nodejs18.x",
13-
},
23+
bind: [DATABASE_URL, CLERK_SECRET_KEY, DATABASE_AUTH_TOKEN],
24+
runtime: 'nodejs18.x'
25+
}
1426
},
1527
});
1628

29+
const membersQueue = new Queue(stack, "ExtractMemberPageQueue");
30+
membersQueue.addConsumer(stack, {
31+
function: {
32+
bind: [bus, membersQueue, DATABASE_URL, CLERK_SECRET_KEY, DATABASE_AUTH_TOKEN], // Issue: need to bind bus because same file
33+
handler: 'src/extract-member.queueHandler'
34+
}
35+
})
36+
37+
bus.addTargets(stack, 'extractRepository', {
38+
'extractMember': {
39+
function: {
40+
bind: [bus, membersQueue],
41+
handler: 'src/extract-member.eventHandler'
42+
}
43+
}
44+
});
45+
1746
const queue = new Queue(stack, "MRQueue", {
1847
// consumer: func.handler,
1948
});
20-
21-
const DATABASE_URL = new Config.Secret(stack, "DATABASE_URL");
22-
const DATABASE_AUTH_TOKEN = new Config.Secret(stack, "DATABASE_AUTH_TOKEN");
23-
const CLERK_SECRET_KEY = new Config.Secret(stack, "CLERK_SECRET_KEY");
24-
49+
2550
const ENVSchema = z.object({
2651
CLERK_JWT_ISSUER: z.string(),
2752
CLERK_JWT_AUDIENCE: z.string(),

packages/functions/extract/src/config.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
import type { BetterSQLite3Database } from 'drizzle-orm/better-sqlite3';
22
import type { LibSQLDatabase } from 'drizzle-orm/libsql';
3-
import type { repositories, namespaces, mergeRequests } from '@acme/extract-schema';
3+
import type { repositories, namespaces, mergeRequests, members, repositoriesToMembers } from '@acme/extract-schema';
44
import type { SourceControl } from '@acme/source-control';
55

66
export type Database = BetterSQLite3Database | LibSQLDatabase;
77

88
export type Entities = {
9-
repositories: typeof repositories,
10-
namespaces: typeof namespaces,
11-
mergeRequests: typeof mergeRequests,
9+
repositories: typeof repositories;
10+
namespaces: typeof namespaces;
11+
mergeRequests: typeof mergeRequests;
12+
members: typeof members;
13+
repositoriesToMembers: typeof repositoriesToMembers;
1214
};
1315

1416
export type Context<SC extends Partial<SourceControl>, E extends Partial<Entities>> = {

0 commit comments

Comments
 (0)