diff --git a/services/apps/members_enrichment_worker/src/activities/enrichment.ts b/services/apps/members_enrichment_worker/src/activities/enrichment.ts index 8b2bd6f188..df4146452f 100644 --- a/services/apps/members_enrichment_worker/src/activities/enrichment.ts +++ b/services/apps/members_enrichment_worker/src/activities/enrichment.ts @@ -32,7 +32,12 @@ import { updateMemberEnrichmentCacheDb, updateMemberOrg, } from '@crowd/data-access-layer/src/old/apps/members_enrichment_worker' -import { findOrCreateOrganization } from '@crowd/data-access-layer/src/organizations' +import OrganizationMergeSuggestionsRepository from '@crowd/data-access-layer/src/old/apps/merge_suggestions_worker/organizationMergeSuggestions.repo' +import { + addOrgIdentity, + findOrCreateOrganization, + findOrgByVerifiedIdentity, +} from '@crowd/data-access-layer/src/organizations' import { dbStoreQx, pgpQx } from '@crowd/data-access-layer/src/queryExecutor' import { refreshMaterializedView } from '@crowd/data-access-layer/src/utils' import { SearchSyncApiClient } from '@crowd/opensearch' @@ -48,6 +53,7 @@ import { MemberIdentityType, OrganizationAttributeSource, OrganizationIdentityType, + OrganizationMergeSuggestionTable, OrganizationSource, PlatformType, } from '@crowd/types' @@ -375,7 +381,6 @@ export async function updateMemberUsingSquashedPayload( } } - const orgIdsToSync: string[] = [] const newOrUpdatedMemberOrgs = [] if (squashedPayload.memberOrganizations.length > 0) { @@ -420,32 +425,151 @@ export async function updateMemberUsingSquashedPayload( const orgSource = OrganizationAttributeSource.ENRICHMENT orgPromises.push( - findOrCreateOrganization(qx, orgSource, { - displayName: org.name, - description: org.organizationDescription, - identities: identities.map((i) => ({ ...i, source: orgSource })), - }) - .then((orgId) => { - // set the organization id for later use + (async () => { + let orgId: string | undefined + const orgPayload = { + displayName: org.name, + description: org.organizationDescription, + identities: identities.map((i) => ({ ...i, source: orgSource })), + } + + try { + // Keep the org write in a savepoint: if this identity is already verified + // on another org, we can recover without aborting the member update transaction. + orgId = await qx.tx((trnx) => findOrCreateOrganization(trnx, orgSource, orgPayload)) + } catch (error) { + const constraint = 'uix_organizationIdentities_plat_val_typ_tenantId_verified' + const dbError = error as { constraint?: string; detail?: string } + + if ( + error.constructor?.name !== 'DatabaseError' || + dbError.constraint !== constraint || + !dbError.detail + ) { + throw error + } + + const match = dbError.detail.match(/=\((.*?)\)/) + if (!match) throw error + + const [platform, value, type] = match[1].split(',').map((v) => v.trim()) + const erroredIdentity = { + platform, + value, + type: type as OrganizationIdentityType, + verified: true, + } + + const identityOwners = [] + const erroredIdentityOwner = await findOrgByVerifiedIdentity(qx, erroredIdentity) + if (!erroredIdentityOwner) throw error + + identityOwners.push({ + identity: erroredIdentity, + organizationId: erroredIdentityOwner.id, + }) + + // The first write normalizes domain identities before failing. Use that normalized + // payload when checking the rest, so the retry won't hit the same index again. + for (const identity of orgPayload.identities.filter((i) => i.verified)) { + const isErroredIdentity = + identity.platform === erroredIdentity.platform && + identity.type === erroredIdentity.type && + identity.value.toLowerCase() === erroredIdentity.value.toLowerCase() + + if (!isErroredIdentity) { + const owner = await findOrgByVerifiedIdentity(qx, identity) + + if (owner) { + identityOwners.push({ identity, organizationId: owner.id }) + } + } + } + + // Keep the enriched org identity as an unverified signal. The verified version stays + // with the existing owner, preserving the unique identity invariant. + const identitiesToAddAsUnverified = identityOwners.map((owner) => owner.identity) + const retryIdentities = orgPayload.identities.filter( + (identity) => + !identitiesToAddAsUnverified.some( + (identityToAddAsUnverified) => + identity.platform === identityToAddAsUnverified.platform && + identity.type === identityToAddAsUnverified.type && + identity.value.toLowerCase() === + identityToAddAsUnverified.value.toLowerCase(), + ), + ) + + orgId = await qx.tx((trnx) => + findOrCreateOrganization(trnx, orgSource, { + ...orgPayload, + identities: retryIdentities, + }), + ) + + if (orgId) { + const mergeSuggestionsRepo = new OrganizationMergeSuggestionsRepository( + tx.transaction(), + svc.log, + ) + const mergeSuggestions = [] + const suggestedOwnerIds = new Set() + + for (const identityOwner of identityOwners) { + if (identityOwner.organizationId !== orgId) { + await addOrgIdentity(qx, { + organizationId: orgId, + platform: identityOwner.identity.platform, + value: identityOwner.identity.value, + type: identityOwner.identity.type, + verified: false, + source: orgSource, + }) + + const noMergeIds = await mergeSuggestionsRepo.findNoMergeIds( + identityOwner.organizationId, + ) + if ( + !noMergeIds.includes(orgId) && + !suggestedOwnerIds.has(identityOwner.organizationId) + ) { + suggestedOwnerIds.add(identityOwner.organizationId) + mergeSuggestions.push({ + similarity: 0.95, + organizations: [identityOwner.organizationId, orgId] as [string, string], + }) + } + } + } + + if (mergeSuggestions.length > 0) { + // A shared verified identity is a strong merge signal, unless the pair was + // explicitly marked as no-merge by a reviewer. + await mergeSuggestionsRepo.addToMerge( + mergeSuggestions, + OrganizationMergeSuggestionTable.ORGANIZATION_TO_MERGE_RAW, + ) + await mergeSuggestionsRepo.addToMerge( + mergeSuggestions, + OrganizationMergeSuggestionTable.ORGANIZATION_TO_MERGE_FILTERED, + ) + } + } + } + + if (orgId) { org.organizationId = orgId if (org.identities) { for (const i of org.identities) { i.organizationId = orgId } } - if (orgId) { - orgIdsToSync.push(orgId) - } - }) - .then(() => - Promise.all( - orgIdsToSync.map((orgId) => - syncOrganization(orgId).catch((error) => { - console.error(`Failed to sync organization with ID ${orgId}:`, error) - }), - ), - ), - ), + + await syncOrganization(orgId).catch((error) => { + svc.log.error({ orgId, error }, 'Failed to sync organization') + }) + } + })(), ) }