Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
"script:refreshGithubRepoSettings": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/refresh-github-repo-settings.ts",
"script:fix-duplicate-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/fix-duplicate-members.ts",
"script:fix-members-activities-after-unaffilation": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/fix-members-activities-after-unaffilation.ts",
"script:process-bot-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/process-bot-members.ts"
"script:process-bot-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/process-bot-members.ts",
"script:backfill-email-domain-member-organization-dates": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/backfill-email-domain-member-organization-dates.ts"
},
"lint-staged": {
"**/*.ts": [
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import commandLineArgs from 'command-line-args'

import { inferMemberOrganizationStintChanges } from '@crowd/common_services'
import {
createMemberOrganization,
fetchEmailDomainMemberOrganizationActivityDates,
fetchEmailDomainMemberOrganizationsWithoutDates,
fetchMemberOrganizationsBySource,
pgpQx,
updateMemberOrganization,
} from '@crowd/data-access-layer'
import { getDbConnection } from '@crowd/data-access-layer/src/database'
import { chunkArray } from '@crowd/data-access-layer/src/old/apps/merge_suggestions_worker/utils'
import { getServiceLogger } from '@crowd/logging'
import { getRedisClient } from '@crowd/redis'
import { OrganizationSource } from '@crowd/types'

import { DB_CONFIG, REDIS_CONFIG } from '@/conf'

const log = getServiceLogger()

const options = [
{
name: 'testRun',
alias: 't',
type: Boolean,
description: 'Run in test mode (limit to 1 batch and 10 members).',
},
{
name: 'afterMemberId',
alias: 'a',
type: String,
description: 'The member ID to start processing after.',
},
{
name: 'batchSize',
alias: 'b',
type: Number,
description: 'The number of members to fetch in each batch.',
},
{
name: 'help',
alias: 'h',
type: Boolean,
description: 'Print this usage guide.',
},
]

const parameters = commandLineArgs(options)

setImmediate(async () => {
const testRun = parameters.testRun ?? false
const BATCH_SIZE = parameters.batchSize ?? (testRun ? 10 : 500)
let afterMemberId = parameters.afterMemberId ?? undefined

const db = await getDbConnection({
host: DB_CONFIG.writeHost,
port: DB_CONFIG.port,
database: DB_CONFIG.database,
user: DB_CONFIG.username,
password: DB_CONFIG.password,
})

const qx = pgpQx(db)
const redis = await getRedisClient(REDIS_CONFIG, true)

log.info({ testRun, BATCH_SIZE, afterMemberId }, 'Running script with the following parameters!')

let hasMore = true

while (hasMore) {
const memberIds = await fetchEmailDomainMemberOrganizationsWithoutDates(
qx,
BATCH_SIZE,
afterMemberId,
)

if (memberIds.length > 0) {
for (const chunk of chunkArray(memberIds, 50)) {
await Promise.all(
chunk.map(async (memberId) => {
if (testRun) {
log.info({ memberId }, 'Processing member!')
}

try {
const [existingMemberOrganizations, activityDates] = await Promise.all([
fetchMemberOrganizationsBySource(qx, memberId, OrganizationSource.EMAIL_DOMAIN),
fetchEmailDomainMemberOrganizationActivityDates(qx, memberId),
])

const changes = inferMemberOrganizationStintChanges(
memberId,
existingMemberOrganizations,
activityDates,
)

if (testRun) {
log.info(
{ existingMemberOrganizations, activityDates, changes },
'Previewing changes for member.',
)
}

if (changes.length > 0) {
await qx.tx(async (tx) => {
for (const change of changes) {
if (change.type === 'insert') {
await createMemberOrganization(tx, memberId, {
organizationId: change.organizationId,
dateStart: change.dateStart,
dateEnd: change.dateEnd,
source: OrganizationSource.EMAIL_DOMAIN,
})
} else if (change.type === 'update') {
await updateMemberOrganization(tx, memberId, change.id, {
dateStart: change.dateStart,
dateEnd: change.dateEnd,
})
}

if (testRun) {
log.info(
{ memberId, orgId: change.organizationId, type: change.type },
'Member organization updated.',
)
}
}
})
await redis.sAdd('recalculate-member-affiliations', [memberId])
} else if (testRun) {
log.info({ memberId }, 'No changes found for member!')
}
} catch (err) {
log.error({ memberId, err }, 'Failed to process for member!')
throw err
}
}),
)
}

const lastMemberId = memberIds[memberIds.length - 1]
afterMemberId = lastMemberId

log.info({ lastMemberId, count: memberIds.length }, 'Batch processed!')

if (testRun || memberIds.length < BATCH_SIZE) {
hasMore = false
}
} else {
hasMore = false
}
}

process.exit(0)
})
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export async function blockMemberOrganizationAffiliation(

export async function markMemberForAffiliationRecalc(memberIds: string[]): Promise<void> {
try {
await svc.redis.sAdd('queue:recalculate:members:affiliation', memberIds)
await svc.redis.sAdd('recalculate-member-affiliations', memberIds)
Comment thread
skwowet marked this conversation as resolved.
} catch (error) {
svc.log.error(error, 'Error marking member for affiliation recalc!')
throw error
Expand All @@ -67,7 +67,7 @@ export async function markMemberForAffiliationRecalc(memberIds: string[]): Promi

export async function getMembersForAffiliationRecalc(batchSize: number): Promise<string[]> {
try {
return svc.redis.sPop('queue:recalculate:members:affiliation', batchSize)
return svc.redis.sPop('recalculate-member-affiliations', batchSize)
} catch (error) {
Comment thread
skwowet marked this conversation as resolved.
svc.log.error(error, 'Error getting members for affiliation recalc!')
throw error
Expand Down
67 changes: 67 additions & 0 deletions services/libs/data-access-layer/src/members/organizations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import {
import { EntityType } from '../old/apps/script_executor_worker/types'
import { QueryExecutor } from '../queryExecutor'

import { EmailDomainMemberOrganizationActivityDate } from './types'

/* eslint-disable @typescript-eslint/no-explicit-any */

export async function fetchMemberOrganizations(
Expand Down Expand Up @@ -59,6 +61,71 @@ export async function fetchMemberOrganizationsBySource(
)
}

export async function fetchEmailDomainMemberOrganizationsWithoutDates(
qx: QueryExecutor,
limit: number,
afterMemberId?: string,
): Promise<string[]> {
const rows = await qx.select(
`
SELECT DISTINCT "memberId"
FROM "memberOrganizations"
WHERE "source" = 'email-domain'
AND "dateStart" IS NULL
AND "dateEnd" IS NULL
AND "deletedAt" IS NULL
${afterMemberId ? `AND "memberId" > $(afterMemberId)` : ''}
ORDER BY "memberId"
LIMIT $(limit)
`,
{ limit, afterMemberId },
)

return rows.map((r) => r.memberId)
}

export async function fetchEmailDomainMemberOrganizationActivityDates(
qx: QueryExecutor,
memberId: string,
): Promise<EmailDomainMemberOrganizationActivityDate[]> {
return qx.select(
`
WITH email_domain_member_orgs AS (
SELECT DISTINCT
mo."memberId",
mo."organizationId",
lower(oi.value) AS domain
FROM "memberOrganizations" mo
INNER JOIN "organizationIdentities" oi
ON oi."organizationId" = mo."organizationId"
AND oi.type = 'primary-domain'
AND oi.verified = true
WHERE mo."memberId" = $(memberId)
AND mo."source" = 'email-domain'
AND mo."deletedAt" IS NULL
)
SELECT DISTINCT
edmo."memberId",
edmo."organizationId",
ar."timestamp"::date::text AS date
FROM email_domain_member_orgs edmo
INNER JOIN "memberIdentities" mi
ON mi."memberId" = edmo."memberId"
AND mi.verified = true
AND mi.type = 'email'
AND mi."deletedAt" IS NULL
AND lower(split_part(mi.value, '@', 2)) = edmo.domain
INNER JOIN "activityRelations" ar
ON ar."memberId" = mi."memberId"
AND ar.platform = mi.platform
AND lower(ar.username) = lower(mi.value)
AND ar."timestamp" IS NOT NULL
ORDER BY edmo."memberId", edmo."organizationId", date
`,
{ memberId },
)
}

export async function fetchOrganizationMemberIds(
qx: QueryExecutor,
organizationId: string,
Expand Down
6 changes: 5 additions & 1 deletion services/libs/data-access-layer/src/members/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { IAttributes, IMemberAttribute, MemberAttributeType } from '@crowd/types'
import { IAttributes, IMemberAttribute, MemberAttributeType, MemberOrgDate } from '@crowd/types'

export interface IQueryNumberOfNewMembers {
segmentIds?: string[]
Expand Down Expand Up @@ -93,3 +93,7 @@ export interface IDbMemberBotSuggestionBySegment {
avatarUrl: string
attributes: IAttributes
}

export interface EmailDomainMemberOrganizationActivityDate extends MemberOrgDate {
memberId: string
}
Loading