Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 46 additions & 22 deletions services/apps/pcc_sync_worker/src/consumer/pccProjectConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ export class PccProjectConsumer {
)
return { action: 'SKIPPED' }
}
segment = fallback as SegmentRow | null
segment = fallback
}

// Step 3: no match → SKIP (Phase 1: project doesn't exist in CDP yet)
Expand Down Expand Up @@ -513,7 +513,7 @@ async function upsertSegment(
SET name = $(name),
status = COALESCE($(status)::"segmentsStatus_type", status),
maturity = $(maturity),
description = $(description),
description = COALESCE($(description), description),
"updatedAt" = NOW()
WHERE "sourceId" = $(sourceId) AND "tenantId" = $(tenantId)`,
{
Expand Down Expand Up @@ -560,20 +560,31 @@ async function upsertInsightsProject(

// Slug is intentionally not updated — it is a stable identifier referenced by FK from
// securityInsightsEvaluations and related tables.
// logoUrl won't be updated in InsightsProject until we confirm that the format is
// compatible with the Insights Squared standard. Do NOT reintroduce it as a
// `--`-commented SQL line: pg-promise scans placeholders textually and would still
// require the `logoUrl` param, triggering "Property 'logoUrl' doesn't exist".
// description: COALESCE keeps existing when PCC sends null (CM-1131).
// logoUrl: COALESCE("logoUrl", …) never overrides an existing logo; only fills missing ones (CM-1131).
//
// Wrapped in db.tx() so that when called inside an outer transaction (ITask), pg-promise
// creates a SAVEPOINT. A 23505 failure rolls back only the savepoint, leaving the outer
// transaction intact. Without this, a caught PG error still leaves the transaction in
// an aborted state and all subsequent queries on the same tx would fail.
try {
await db.none(
`UPDATE "insightsProjects"
SET name = $(name),
description = $(description),
"updatedAt" = NOW()
WHERE "segmentId" = $(segmentId)
AND "deletedAt" IS NULL`,
{ segmentId, name: project.name, description: project.description },
)
await db.tx(async (t) => {
await t.none(
`UPDATE "insightsProjects"
SET name = $(name),
description = COALESCE($(description), description),
"logoUrl" = COALESCE("logoUrl", $(logoUrl)),
"updatedAt" = NOW()
WHERE "segmentId" = $(segmentId)
AND "deletedAt" IS NULL`,
{
segmentId,
name: project.name,
description: project.description,
logoUrl: project.logoUrl,
},
)
})
} catch (err) {
if (isDuplicateKeyError(err)) return true
throw err
Expand Down Expand Up @@ -609,15 +620,28 @@ async function upsertInsightsProject(
)
if (conflicting) return true

// logoUrl intentionally omitted from the INSERT column list — see note above.
// Same savepoint rationale as the UPDATE path above.
try {
await db.none(
`INSERT INTO "insightsProjects" (name, slug, description, "segmentId", "isLF")
VALUES ($(name), generate_slug('insightsProjects', $(name)), $(description), $(segmentId), TRUE)`,
{ name: project.name, description: project.description, segmentId },
)
await db.tx(async (t) => {
await t.none(
`INSERT INTO "insightsProjects" (name, slug, description, "logoUrl", "segmentId", "isLF")
VALUES ($(name), generate_slug('insightsProjects', $(name)), $(description), $(logoUrl), $(segmentId), TRUE)`,
{
name: project.name,
description: project.description,
logoUrl: project.logoUrl,
segmentId,
},
)
})
} catch (err) {
if (isDuplicateKeyError(err)) return true
if (isDuplicateKeyError(err)) {
// unique_project_segmentId: another worker already inserted a row for this segment
// concurrently — treat as "already represented", no conflict to record.
const constraintName = (err as { constraint?: string }).constraint
if (constraintName === 'unique_project_segmentId') return false
return true
}
throw err
Comment thread
themarolt marked this conversation as resolved.
}
return false
Expand Down
Loading