From 05b339a6581c02a112d9bcf39f93a7f7b406b989 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Mon, 27 Apr 2026 18:33:35 +0530 Subject: [PATCH 1/2] fix: handle verified organization identity collisions Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- .../src/activities/enrichment.ts | 121 ++++++++++++++---- 1 file changed, 99 insertions(+), 22 deletions(-) diff --git a/services/apps/members_enrichment_worker/src/activities/enrichment.ts b/services/apps/members_enrichment_worker/src/activities/enrichment.ts index 8b2bd6f188..6b32119508 100644 --- a/services/apps/members_enrichment_worker/src/activities/enrichment.ts +++ b/services/apps/members_enrichment_worker/src/activities/enrichment.ts @@ -32,7 +32,11 @@ import { updateMemberEnrichmentCacheDb, updateMemberOrg, } from '@crowd/data-access-layer/src/old/apps/members_enrichment_worker' -import { findOrCreateOrganization } from '@crowd/data-access-layer/src/organizations' +import OrganizationMergeSuggestionsRepository from '@crowd/data-access-layer/src/old/apps/merge_suggestions_worker/organizationMergeSuggestions.repo' +import { + findOrCreateOrganization, + findOrgByVerifiedIdentity, +} from '@crowd/data-access-layer/src/organizations' import { dbStoreQx, pgpQx } from '@crowd/data-access-layer/src/queryExecutor' import { refreshMaterializedView } from '@crowd/data-access-layer/src/utils' import { SearchSyncApiClient } from '@crowd/opensearch' @@ -48,6 +52,7 @@ import { MemberIdentityType, OrganizationAttributeSource, OrganizationIdentityType, + OrganizationMergeSuggestionTable, OrganizationSource, PlatformType, } from '@crowd/types' @@ -375,7 +380,6 @@ export async function updateMemberUsingSquashedPayload( } } - const orgIdsToSync: string[] = [] const newOrUpdatedMemberOrgs = [] if (squashedPayload.memberOrganizations.length > 0) { @@ -420,32 +424,105 @@ export async function updateMemberUsingSquashedPayload( const orgSource = OrganizationAttributeSource.ENRICHMENT orgPromises.push( - findOrCreateOrganization(qx, orgSource, { - displayName: org.name, - description: org.organizationDescription, - identities: identities.map((i) => ({ ...i, source: orgSource })), - }) - .then((orgId) => { - // set the organization id for later use + (async () => { + let orgId: string | undefined + const orgPayload = { + displayName: org.name, + description: org.organizationDescription, + identities: identities.map((i) => ({ ...i, source: orgSource })), + } + + try { + // Keep the org write in a savepoint: if this identity is already verified + // on another org, we can recover without aborting the member update transaction. + orgId = await qx.tx((trnx) => findOrCreateOrganization(trnx, orgSource, orgPayload)) + } catch (error) { + const constraint = 'uix_organizationIdentities_plat_val_typ_tenantId_verified' + const dbError = error as { constraint?: string; detail?: string } + + if ( + error.constructor?.name !== 'DatabaseError' || + dbError.constraint !== constraint || + !dbError.detail + ) { + throw error + } + + const match = dbError.detail.match(/=\((.*?)\)/) + if (!match) throw error + + const [platform, value, type] = match[1].split(',').map((v) => v.trim()) + const erroredIdentity = { + platform, + value, + type: type as OrganizationIdentityType, + verified: true, + } + + const owner = await findOrgByVerifiedIdentity(qx, erroredIdentity) + if (!owner) throw error + + // Keep the enriched org identity as an unverified signal. The verified version stays + // with the existing owner, preserving the unique identity invariant. + const demotedIdentities = identities.map((identity) => { + const isMatch = + identity.platform === erroredIdentity.platform && + identity.type === erroredIdentity.type && + identity.value.toLowerCase() === erroredIdentity.value.toLowerCase() + + return isMatch + ? { ...identity, verified: false, source: orgSource } + : { ...identity, source: orgSource } + }) + + orgId = await qx.tx((trnx) => + findOrCreateOrganization(trnx, orgSource, { + ...orgPayload, + identities: demotedIdentities, + }), + ) + + if (orgId && owner.id !== orgId) { + const mergeSuggestionsRepo = new OrganizationMergeSuggestionsRepository( + tx.transaction(), + svc.log, + ) + const noMergeIds = await mergeSuggestionsRepo.findNoMergeIds(owner.id) + if (!noMergeIds.includes(orgId)) { + // A shared verified identity is a strong merge signal, unless the pair was + // explicitly marked as no-merge by a reviewer. + const mergeSuggestions = [ + { + similarity: 0.95, + organizations: [owner.id, orgId] as [string, string], + }, + ] + + await mergeSuggestionsRepo.addToMerge( + mergeSuggestions, + OrganizationMergeSuggestionTable.ORGANIZATION_TO_MERGE_RAW, + ) + await mergeSuggestionsRepo.addToMerge( + mergeSuggestions, + OrganizationMergeSuggestionTable.ORGANIZATION_TO_MERGE_FILTERED, + ) + } + } + } + + if (orgId) { org.organizationId = orgId if (org.identities) { for (const i of org.identities) { i.organizationId = orgId } } - if (orgId) { - orgIdsToSync.push(orgId) - } - }) - .then(() => - Promise.all( - orgIdsToSync.map((orgId) => - syncOrganization(orgId).catch((error) => { - console.error(`Failed to sync organization with ID ${orgId}:`, error) - }), - ), - ), - ), + + await syncOrganization(orgId).catch((error) => { + svc.log.error({ orgId, error }, 'Failed to sync organization') + }) + } + })(), ) } From a0c387ce6f2a720e47b404ee093f00b80fdb4d85 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Mon, 27 Apr 2026 19:50:39 +0530 Subject: [PATCH 2/2] refactor: improve organization identity handling during member updates Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- .../src/activities/enrichment.ts | 89 ++++++++++++++----- 1 file changed, 68 insertions(+), 21 deletions(-) diff --git a/services/apps/members_enrichment_worker/src/activities/enrichment.ts b/services/apps/members_enrichment_worker/src/activities/enrichment.ts index 6b32119508..df4146452f 100644 --- a/services/apps/members_enrichment_worker/src/activities/enrichment.ts +++ b/services/apps/members_enrichment_worker/src/activities/enrichment.ts @@ -34,6 +34,7 @@ import { } from '@crowd/data-access-layer/src/old/apps/members_enrichment_worker' import OrganizationMergeSuggestionsRepository from '@crowd/data-access-layer/src/old/apps/merge_suggestions_worker/organizationMergeSuggestions.repo' import { + addOrgIdentity, findOrCreateOrganization, findOrgByVerifiedIdentity, } from '@crowd/data-access-layer/src/organizations' @@ -459,45 +460,91 @@ export async function updateMemberUsingSquashedPayload( verified: true, } - const owner = await findOrgByVerifiedIdentity(qx, erroredIdentity) - if (!owner) throw error + const identityOwners = [] + const erroredIdentityOwner = await findOrgByVerifiedIdentity(qx, erroredIdentity) + if (!erroredIdentityOwner) throw error - // Keep the enriched org identity as an unverified signal. The verified version stays - // with the existing owner, preserving the unique identity invariant. - const demotedIdentities = identities.map((identity) => { - const isMatch = + identityOwners.push({ + identity: erroredIdentity, + organizationId: erroredIdentityOwner.id, + }) + + // The first write normalizes domain identities before failing. Use that normalized + // payload when checking the rest, so the retry won't hit the same index again. + for (const identity of orgPayload.identities.filter((i) => i.verified)) { + const isErroredIdentity = identity.platform === erroredIdentity.platform && identity.type === erroredIdentity.type && identity.value.toLowerCase() === erroredIdentity.value.toLowerCase() - return isMatch - ? { ...identity, verified: false, source: orgSource } - : { ...identity, source: orgSource } - }) + if (!isErroredIdentity) { + const owner = await findOrgByVerifiedIdentity(qx, identity) + + if (owner) { + identityOwners.push({ identity, organizationId: owner.id }) + } + } + } + + // Keep the enriched org identity as an unverified signal. The verified version stays + // with the existing owner, preserving the unique identity invariant. + const identitiesToAddAsUnverified = identityOwners.map((owner) => owner.identity) + const retryIdentities = orgPayload.identities.filter( + (identity) => + !identitiesToAddAsUnverified.some( + (identityToAddAsUnverified) => + identity.platform === identityToAddAsUnverified.platform && + identity.type === identityToAddAsUnverified.type && + identity.value.toLowerCase() === + identityToAddAsUnverified.value.toLowerCase(), + ), + ) orgId = await qx.tx((trnx) => findOrCreateOrganization(trnx, orgSource, { ...orgPayload, - identities: demotedIdentities, + identities: retryIdentities, }), ) - if (orgId && owner.id !== orgId) { + if (orgId) { const mergeSuggestionsRepo = new OrganizationMergeSuggestionsRepository( tx.transaction(), svc.log, ) - const noMergeIds = await mergeSuggestionsRepo.findNoMergeIds(owner.id) - if (!noMergeIds.includes(orgId)) { + const mergeSuggestions = [] + const suggestedOwnerIds = new Set() + + for (const identityOwner of identityOwners) { + if (identityOwner.organizationId !== orgId) { + await addOrgIdentity(qx, { + organizationId: orgId, + platform: identityOwner.identity.platform, + value: identityOwner.identity.value, + type: identityOwner.identity.type, + verified: false, + source: orgSource, + }) + + const noMergeIds = await mergeSuggestionsRepo.findNoMergeIds( + identityOwner.organizationId, + ) + if ( + !noMergeIds.includes(orgId) && + !suggestedOwnerIds.has(identityOwner.organizationId) + ) { + suggestedOwnerIds.add(identityOwner.organizationId) + mergeSuggestions.push({ + similarity: 0.95, + organizations: [identityOwner.organizationId, orgId] as [string, string], + }) + } + } + } + + if (mergeSuggestions.length > 0) { // A shared verified identity is a strong merge signal, unless the pair was // explicitly marked as no-merge by a reviewer. - const mergeSuggestions = [ - { - similarity: 0.95, - organizations: [owner.id, orgId] as [string, string], - }, - ] - await mergeSuggestionsRepo.addToMerge( mergeSuggestions, OrganizationMergeSuggestionTable.ORGANIZATION_TO_MERGE_RAW,