Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 146 additions & 22 deletions services/apps/members_enrichment_worker/src/activities/enrichment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ import {
updateMemberEnrichmentCacheDb,
updateMemberOrg,
} from '@crowd/data-access-layer/src/old/apps/members_enrichment_worker'
import { findOrCreateOrganization } from '@crowd/data-access-layer/src/organizations'
import OrganizationMergeSuggestionsRepository from '@crowd/data-access-layer/src/old/apps/merge_suggestions_worker/organizationMergeSuggestions.repo'
import {
addOrgIdentity,
findOrCreateOrganization,
findOrgByVerifiedIdentity,
} from '@crowd/data-access-layer/src/organizations'
import { dbStoreQx, pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
import { refreshMaterializedView } from '@crowd/data-access-layer/src/utils'
import { SearchSyncApiClient } from '@crowd/opensearch'
Expand All @@ -48,6 +53,7 @@ import {
MemberIdentityType,
OrganizationAttributeSource,
OrganizationIdentityType,
OrganizationMergeSuggestionTable,
OrganizationSource,
PlatformType,
} from '@crowd/types'
Expand Down Expand Up @@ -375,7 +381,6 @@ export async function updateMemberUsingSquashedPayload(
}
}

const orgIdsToSync: string[] = []
const newOrUpdatedMemberOrgs = []

if (squashedPayload.memberOrganizations.length > 0) {
Expand Down Expand Up @@ -420,32 +425,151 @@ export async function updateMemberUsingSquashedPayload(
const orgSource = OrganizationAttributeSource.ENRICHMENT

orgPromises.push(
findOrCreateOrganization(qx, orgSource, {
displayName: org.name,
description: org.organizationDescription,
identities: identities.map((i) => ({ ...i, source: orgSource })),
})
.then((orgId) => {
// set the organization id for later use
(async () => {
let orgId: string | undefined
const orgPayload = {
displayName: org.name,
description: org.organizationDescription,
identities: identities.map((i) => ({ ...i, source: orgSource })),
}

try {
// Keep the org write in a savepoint: if this identity is already verified
// on another org, we can recover without aborting the member update transaction.
orgId = await qx.tx((trnx) => findOrCreateOrganization(trnx, orgSource, orgPayload))
} catch (error) {
const constraint = 'uix_organizationIdentities_plat_val_typ_tenantId_verified'
const dbError = error as { constraint?: string; detail?: string }

if (
error.constructor?.name !== 'DatabaseError' ||
dbError.constraint !== constraint ||
!dbError.detail
) {
throw error
}

const match = dbError.detail.match(/=\((.*?)\)/)
if (!match) throw error

const [platform, value, type] = match[1].split(',').map((v) => v.trim())
const erroredIdentity = {
platform,
value,
type: type as OrganizationIdentityType,
verified: true,
}

const identityOwners = []
const erroredIdentityOwner = await findOrgByVerifiedIdentity(qx, erroredIdentity)
if (!erroredIdentityOwner) throw error

identityOwners.push({
identity: erroredIdentity,
organizationId: erroredIdentityOwner.id,
})

// The first write normalizes domain identities before failing. Use that normalized
// payload when checking the rest, so the retry won't hit the same index again.
for (const identity of orgPayload.identities.filter((i) => i.verified)) {
const isErroredIdentity =
identity.platform === erroredIdentity.platform &&
identity.type === erroredIdentity.type &&
identity.value.toLowerCase() === erroredIdentity.value.toLowerCase()

if (!isErroredIdentity) {
const owner = await findOrgByVerifiedIdentity(qx, identity)

if (owner) {
identityOwners.push({ identity, organizationId: owner.id })
}
}
}

// Keep the enriched org identity as an unverified signal. The verified version stays
// with the existing owner, preserving the unique identity invariant.
const identitiesToAddAsUnverified = identityOwners.map((owner) => owner.identity)
const retryIdentities = orgPayload.identities.filter(
(identity) =>
!identitiesToAddAsUnverified.some(
(identityToAddAsUnverified) =>
identity.platform === identityToAddAsUnverified.platform &&
identity.type === identityToAddAsUnverified.type &&
identity.value.toLowerCase() ===
identityToAddAsUnverified.value.toLowerCase(),
),
)

orgId = await qx.tx((trnx) =>
findOrCreateOrganization(trnx, orgSource, {
...orgPayload,
identities: retryIdentities,
}),
)

if (orgId) {
const mergeSuggestionsRepo = new OrganizationMergeSuggestionsRepository(
tx.transaction(),
svc.log,
)
const mergeSuggestions = []
const suggestedOwnerIds = new Set<string>()

for (const identityOwner of identityOwners) {
if (identityOwner.organizationId !== orgId) {
await addOrgIdentity(qx, {
organizationId: orgId,
platform: identityOwner.identity.platform,
value: identityOwner.identity.value,
type: identityOwner.identity.type,
verified: false,
source: orgSource,
})

const noMergeIds = await mergeSuggestionsRepo.findNoMergeIds(
identityOwner.organizationId,
)
if (
!noMergeIds.includes(orgId) &&
!suggestedOwnerIds.has(identityOwner.organizationId)
) {
suggestedOwnerIds.add(identityOwner.organizationId)
mergeSuggestions.push({
similarity: 0.95,
organizations: [identityOwner.organizationId, orgId] as [string, string],
})
}
}
}

if (mergeSuggestions.length > 0) {
// A shared verified identity is a strong merge signal, unless the pair was
// explicitly marked as no-merge by a reviewer.
await mergeSuggestionsRepo.addToMerge(
mergeSuggestions,
OrganizationMergeSuggestionTable.ORGANIZATION_TO_MERGE_RAW,
)
await mergeSuggestionsRepo.addToMerge(
mergeSuggestions,
OrganizationMergeSuggestionTable.ORGANIZATION_TO_MERGE_FILTERED,
)
Comment thread
skwowet marked this conversation as resolved.
}
}
}

if (orgId) {
org.organizationId = orgId
if (org.identities) {
for (const i of org.identities) {
i.organizationId = orgId
}
}
if (orgId) {
orgIdsToSync.push(orgId)
}
})
.then(() =>
Promise.all(
orgIdsToSync.map((orgId) =>
syncOrganization(orgId).catch((error) => {
console.error(`Failed to sync organization with ID ${orgId}:`, error)
}),
),
),
),

await syncOrganization(orgId).catch((error) => {
svc.log.error({ orgId, error }, 'Failed to sync organization')
})
}
})(),
)
}

Expand Down
Loading