From 53b83d37b143e501c51c18857f3edeccb7ce19c6 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Tue, 28 Apr 2026 14:15:09 +0530 Subject: [PATCH] chore: backfill email-domain member organization dates Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- backend/package.json | 3 +- ...-email-domain-member-organization-dates.ts | 156 ++++++++++++++++++ ...block-project-organization-affiliations.ts | 4 +- .../src/members/organizations.ts | 67 ++++++++ .../data-access-layer/src/members/types.ts | 6 +- 5 files changed, 232 insertions(+), 4 deletions(-) create mode 100644 backend/src/bin/scripts/backfill-email-domain-member-organization-dates.ts diff --git a/backend/package.json b/backend/package.json index 5bc3ff15d7..f4a509843c 100644 --- a/backend/package.json +++ b/backend/package.json @@ -32,7 +32,8 @@ "script:refreshGithubRepoSettings": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/refresh-github-repo-settings.ts", "script:fix-duplicate-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/fix-duplicate-members.ts", "script:fix-members-activities-after-unaffilation": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/fix-members-activities-after-unaffilation.ts", - "script:process-bot-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/process-bot-members.ts" + "script:process-bot-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/process-bot-members.ts", + "script:backfill-email-domain-member-organization-dates": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/backfill-email-domain-member-organization-dates.ts" }, "lint-staged": { "**/*.ts": [ diff --git a/backend/src/bin/scripts/backfill-email-domain-member-organization-dates.ts b/backend/src/bin/scripts/backfill-email-domain-member-organization-dates.ts new file mode 100644 index 0000000000..487fdec93e --- /dev/null +++ b/backend/src/bin/scripts/backfill-email-domain-member-organization-dates.ts @@ -0,0 +1,156 @@ +import commandLineArgs from 'command-line-args' + +import { inferMemberOrganizationStintChanges } from '@crowd/common_services' +import { + createMemberOrganization, + fetchEmailDomainMemberOrganizationActivityDates, + fetchEmailDomainMemberOrganizationsWithoutDates, + fetchMemberOrganizationsBySource, + pgpQx, + updateMemberOrganization, +} from '@crowd/data-access-layer' +import { getDbConnection } from '@crowd/data-access-layer/src/database' +import { chunkArray } from '@crowd/data-access-layer/src/old/apps/merge_suggestions_worker/utils' +import { getServiceLogger } from '@crowd/logging' +import { getRedisClient } from '@crowd/redis' +import { OrganizationSource } from '@crowd/types' + +import { DB_CONFIG, REDIS_CONFIG } from '@/conf' + +const log = getServiceLogger() + +const options = [ + { + name: 'testRun', + alias: 't', + type: Boolean, + description: 'Run in test mode (limit to 1 batch and 10 members).', + }, + { + name: 'afterMemberId', + alias: 'a', + type: String, + description: 'The member ID to start processing after.', + }, + { + name: 'batchSize', + alias: 'b', + type: Number, + description: 'The number of members to fetch in each batch.', + }, + { + name: 'help', + alias: 'h', + type: Boolean, + description: 'Print this usage guide.', + }, +] + +const parameters = commandLineArgs(options) + +setImmediate(async () => { + const testRun = parameters.testRun ?? false + const BATCH_SIZE = parameters.batchSize ?? (testRun ? 10 : 500) + let afterMemberId = parameters.afterMemberId ?? undefined + + const db = await getDbConnection({ + host: DB_CONFIG.writeHost, + port: DB_CONFIG.port, + database: DB_CONFIG.database, + user: DB_CONFIG.username, + password: DB_CONFIG.password, + }) + + const qx = pgpQx(db) + const redis = await getRedisClient(REDIS_CONFIG, true) + + log.info({ testRun, BATCH_SIZE, afterMemberId }, 'Running script with the following parameters!') + + let hasMore = true + + while (hasMore) { + const memberIds = await fetchEmailDomainMemberOrganizationsWithoutDates( + qx, + BATCH_SIZE, + afterMemberId, + ) + + if (memberIds.length > 0) { + for (const chunk of chunkArray(memberIds, 50)) { + await Promise.all( + chunk.map(async (memberId) => { + if (testRun) { + log.info({ memberId }, 'Processing member!') + } + + try { + const [existingMemberOrganizations, activityDates] = await Promise.all([ + fetchMemberOrganizationsBySource(qx, memberId, OrganizationSource.EMAIL_DOMAIN), + fetchEmailDomainMemberOrganizationActivityDates(qx, memberId), + ]) + + const changes = inferMemberOrganizationStintChanges( + memberId, + existingMemberOrganizations, + activityDates, + ) + + if (testRun) { + log.info( + { existingMemberOrganizations, activityDates, changes }, + 'Previewing changes for member.', + ) + } + + if (changes.length > 0) { + await qx.tx(async (tx) => { + for (const change of changes) { + if (change.type === 'insert') { + await createMemberOrganization(tx, memberId, { + organizationId: change.organizationId, + dateStart: change.dateStart, + dateEnd: change.dateEnd, + source: OrganizationSource.EMAIL_DOMAIN, + }) + } else if (change.type === 'update') { + await updateMemberOrganization(tx, memberId, change.id, { + dateStart: change.dateStart, + dateEnd: change.dateEnd, + }) + } + + if (testRun) { + log.info( + { memberId, orgId: change.organizationId, type: change.type }, + 'Member organization updated.', + ) + } + } + }) + await redis.sAdd('recalculate-member-affiliations', [memberId]) + } else if (testRun) { + log.info({ memberId }, 'No changes found for member!') + } + } catch (err) { + log.error({ memberId, err }, 'Failed to process for member!') + throw err + } + }), + ) + } + + const lastMemberId = memberIds[memberIds.length - 1] + afterMemberId = lastMemberId + + log.info({ lastMemberId, count: memberIds.length }, 'Batch processed!') + + if (testRun || memberIds.length < BATCH_SIZE) { + hasMore = false + } + } else { + hasMore = false + } + } + + process.exit(0) +}) diff --git a/services/apps/script_executor_worker/src/activities/block-project-organization-affiliations.ts b/services/apps/script_executor_worker/src/activities/block-project-organization-affiliations.ts index 9cc56043f8..479e43056d 100644 --- a/services/apps/script_executor_worker/src/activities/block-project-organization-affiliations.ts +++ b/services/apps/script_executor_worker/src/activities/block-project-organization-affiliations.ts @@ -58,7 +58,7 @@ export async function blockMemberOrganizationAffiliation( export async function markMemberForAffiliationRecalc(memberIds: string[]): Promise { try { - await svc.redis.sAdd('queue:recalculate:members:affiliation', memberIds) + await svc.redis.sAdd('recalculate-member-affiliations', memberIds) } catch (error) { svc.log.error(error, 'Error marking member for affiliation recalc!') throw error @@ -67,7 +67,7 @@ export async function markMemberForAffiliationRecalc(memberIds: string[]): Promi export async function getMembersForAffiliationRecalc(batchSize: number): Promise { try { - return svc.redis.sPop('queue:recalculate:members:affiliation', batchSize) + return svc.redis.sPop('recalculate-member-affiliations', batchSize) } catch (error) { svc.log.error(error, 'Error getting members for affiliation recalc!') throw error diff --git a/services/libs/data-access-layer/src/members/organizations.ts b/services/libs/data-access-layer/src/members/organizations.ts index fcd87bbf18..6ae9ac82b5 100644 --- a/services/libs/data-access-layer/src/members/organizations.ts +++ b/services/libs/data-access-layer/src/members/organizations.ts @@ -14,6 +14,8 @@ import { import { EntityType } from '../old/apps/script_executor_worker/types' import { QueryExecutor } from '../queryExecutor' +import { EmailDomainMemberOrganizationActivityDate } from './types' + /* eslint-disable @typescript-eslint/no-explicit-any */ export async function fetchMemberOrganizations( @@ -59,6 +61,71 @@ export async function fetchMemberOrganizationsBySource( ) } +export async function fetchEmailDomainMemberOrganizationsWithoutDates( + qx: QueryExecutor, + limit: number, + afterMemberId?: string, +): Promise { + const rows = await qx.select( + ` + SELECT DISTINCT "memberId" + FROM "memberOrganizations" + WHERE "source" = 'email-domain' + AND "dateStart" IS NULL + AND "dateEnd" IS NULL + AND "deletedAt" IS NULL + ${afterMemberId ? `AND "memberId" > $(afterMemberId)` : ''} + ORDER BY "memberId" + LIMIT $(limit) + `, + { limit, afterMemberId }, + ) + + return rows.map((r) => r.memberId) +} + +export async function fetchEmailDomainMemberOrganizationActivityDates( + qx: QueryExecutor, + memberId: string, +): Promise { + return qx.select( + ` + WITH email_domain_member_orgs AS ( + SELECT DISTINCT + mo."memberId", + mo."organizationId", + lower(oi.value) AS domain + FROM "memberOrganizations" mo + INNER JOIN "organizationIdentities" oi + ON oi."organizationId" = mo."organizationId" + AND oi.type = 'primary-domain' + AND oi.verified = true + WHERE mo."memberId" = $(memberId) + AND mo."source" = 'email-domain' + AND mo."deletedAt" IS NULL + ) + SELECT DISTINCT + edmo."memberId", + edmo."organizationId", + ar."timestamp"::date::text AS date + FROM email_domain_member_orgs edmo + INNER JOIN "memberIdentities" mi + ON mi."memberId" = edmo."memberId" + AND mi.verified = true + AND mi.type = 'email' + AND mi."deletedAt" IS NULL + AND lower(split_part(mi.value, '@', 2)) = edmo.domain + INNER JOIN "activityRelations" ar + ON ar."memberId" = mi."memberId" + AND ar.platform = mi.platform + AND lower(ar.username) = lower(mi.value) + AND ar."timestamp" IS NOT NULL + ORDER BY edmo."memberId", edmo."organizationId", date + `, + { memberId }, + ) +} + export async function fetchOrganizationMemberIds( qx: QueryExecutor, organizationId: string, diff --git a/services/libs/data-access-layer/src/members/types.ts b/services/libs/data-access-layer/src/members/types.ts index 25b89e7870..c54ec83069 100644 --- a/services/libs/data-access-layer/src/members/types.ts +++ b/services/libs/data-access-layer/src/members/types.ts @@ -1,4 +1,4 @@ -import { IAttributes, IMemberAttribute, MemberAttributeType } from '@crowd/types' +import { IAttributes, IMemberAttribute, MemberAttributeType, MemberOrgDate } from '@crowd/types' export interface IQueryNumberOfNewMembers { segmentIds?: string[] @@ -93,3 +93,7 @@ export interface IDbMemberBotSuggestionBySegment { avatarUrl: string attributes: IAttributes } + +export interface EmailDomainMemberOrganizationActivityDate extends MemberOrgDate { + memberId: string +}