Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 123 additions & 63 deletions services/apps/pcc_sync_worker/src/consumer/pccProjectConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,11 @@ export class PccProjectConsumer {
// project — useful for triage even when the depth rule is unsupported.
if (parsed.pccProjectId) {
const matched = await findSegmentBySourceId(tx, parsed.pccProjectId)
if (matched) {
if (isAmbiguousMatch(matched)) {
errorDetails.matchedVia =
'sourceId (ambiguous — multiple subprojects share this sourceId)'
errorDetails.candidates = matched.candidates
} else if (matched) {
schemaMismatchMatchedCount++
errorDetails.matchedSegmentId = matched.id
errorDetails.matchedSegmentName = matched.name
Expand Down Expand Up @@ -230,7 +234,6 @@ export class PccProjectConsumer {
})
}
} catch (err) {
const errorMessage = err instanceof Error ? err.message : String(err)
log.error({ jobId: job.id, err }, 'PCC job failed')

if (this.dryRun) {
Expand All @@ -239,7 +242,9 @@ export class PccProjectConsumer {
await this.releaseClaimBestEffort(job.id)
} else {
try {
await this.metadataStore.markFailed(job.id, errorMessage, {
await this.metadataStore.markFailed(job.id, err, {
transformedCount: upsertedCount,
skippedCount: skippedCount + schemaMismatchCount + missingProjectIdCount,
processingDurationMs: Date.now() - startTime,
})
} catch (updateErr) {
Expand Down Expand Up @@ -272,7 +277,28 @@ export class PccProjectConsumer {

// Step 2: sourceId fallback
if (!segment) {
segment = await findSegmentBySourceId(tx, project.pccProjectId)
const fallback = await findSegmentBySourceId(tx, project.pccProjectId)
if (isAmbiguousMatch(fallback)) {
log.warn(
{
pccProjectId: project.pccProjectId,
pccSlug: project.pccSlug,
candidates: fallback.candidates,
},
'Multiple subproject segments share this sourceId — cannot determine match, skipping',
)
await this.recordSyncError(
project.pccProjectId,
project.pccSlug,
'AMBIGUOUS_SEGMENT_MATCH',
{
sourceId: project.pccProjectId,
candidates: fallback.candidates,
},
)
Comment thread
themarolt marked this conversation as resolved.
return { action: 'SKIPPED' }
}
segment = fallback as SegmentRow | null
}

// Step 3: no match → SKIP (Phase 1: project doesn't exist in CDP yet)
Expand Down Expand Up @@ -419,6 +445,17 @@ interface SegmentRow {
grandparentName: string | null
}

interface AmbiguousSegmentMatch {
ambiguous: true
candidates: Array<Pick<SegmentRow, 'id' | 'name'>>
}

function isAmbiguousMatch(
result: SegmentRow | null | AmbiguousSegmentMatch,
): result is AmbiguousSegmentMatch {
return result !== null && (result as AmbiguousSegmentMatch).ambiguous === true
}

async function findSegmentById(db: DbConnOrTx, segmentId: string): Promise<SegmentRow | null> {
return db.oneOrNone<SegmentRow>(
`SELECT id, name, slug, "parentName", "grandparentName"
Expand All @@ -428,13 +465,20 @@ async function findSegmentById(db: DbConnOrTx, segmentId: string): Promise<Segme
)
}

async function findSegmentBySourceId(db: DbConnOrTx, sourceId: string): Promise<SegmentRow | null> {
return db.oneOrNone<SegmentRow>(
async function findSegmentBySourceId(
db: DbConnOrTx,
sourceId: string,
): Promise<SegmentRow | null | AmbiguousSegmentMatch> {
const rows = await db.manyOrNone<SegmentRow>(
`SELECT id, name, slug, "parentName", "grandparentName"
FROM segments
WHERE "sourceId" = $(sourceId) AND type = 'subproject' AND "tenantId" = $(tenantId)`,
WHERE "sourceId" = $(sourceId) AND type = 'subproject' AND "tenantId" = $(tenantId)
LIMIT 2`,
{ sourceId, tenantId: DEFAULT_TENANT_ID },
)
if (rows.length === 0) return null
if (rows.length === 1) return rows[0]
return { ambiguous: true, candidates: rows.map((r) => ({ id: r.id, name: r.name })) }
}

function detectHierarchyMismatch(segment: SegmentRow, cdpTarget: CdpHierarchyTarget): string[] {
Expand Down Expand Up @@ -483,61 +527,65 @@ async function upsertSegment(
)
}

// Returns true if a name conflict prevented creating the insightsProject row.
// Updates insightsProject rows for ALL segment levels sharing the same sourceId
// (group, project, subproject). The INSERT is restricted to the matched subproject
// segment (identified by segmentId) to avoid duplicating insights projects for
// hierarchy-only segments.
// Returns true if a name conflict prevented writing the insightsProject row.
// The INSERT is restricted to the matched subproject segment (identified by segmentId)
// to avoid duplicating insights projects for hierarchy-only segments.
async function upsertInsightsProject(
db: DbConnOrTx,
segmentId: string,
sourceId: string,
project: ParsedPccProject,
): Promise<boolean> {
// Check for a cross-family name conflict — another PCC project (different sourceId)
// already holds this name in an active insightsProject row.
const conflicting = await db.oneOrNone<{ id: string }>(
`SELECT ip.id
FROM "insightsProjects" ip
JOIN segments s ON s.id = ip."segmentId"
WHERE ip.name = $(name)
AND ip."deletedAt" IS NULL
AND s."sourceId" IS DISTINCT FROM $(sourceId)
AND s."tenantId" = $(tenantId)`,
{ name: project.name, sourceId, tenantId: DEFAULT_TENANT_ID },
// Split UPDATE vs INSERT paths upfront — each needs a different name-collision guard.
const exists = await db.oneOrNone<{ id: string }>(
`SELECT id FROM "insightsProjects" WHERE "segmentId" = $(segmentId) AND "deletedAt" IS NULL`,
{ segmentId },
)
if (conflicting) return true

// Update the matched subproject segment's insightsProject only.
// Scoped to segmentId rather than all siblings sharing sourceId — the bulk-sibling
// approach causes unique-name constraint violations when siblings have pre-existing
// insightsProjects rows with different names that would all be renamed to the same value.
// Slug is intentionally not updated — it is a stable identifier referenced by FK from
// securityInsightsEvaluations and related tables.
// logoUrl won't be updated in InsightsProject until we confirm that the format is
// compatible with the Insights Squared standard. Do NOT reintroduce it as a
// `--`-commented SQL line: pg-promise scans placeholders textually and would still
// require the `logoUrl` param, triggering "Property 'logoUrl' doesn't exist".
await db.none(
`UPDATE "insightsProjects"
SET name = $(name),
description = $(description),
"updatedAt" = NOW()
WHERE "segmentId" = $(segmentId)
AND "deletedAt" IS NULL`,
{
segmentId,
name: project.name,
description: project.description,
},
)
if (exists) {
// UPDATE path. The partial unique index unique_insightsProjects_name is global, so any
// other active row with the target name will collide. This includes same-sourceId duplicate
// subproject segments (data anomaly — e.g. FIDOPower / OpenFIDO where two CDP subprojects
// share one PCC project_id) as well as cross-family conflicts and NULL-segmentId rows.
// We exclude by PK (never null) rather than by segmentId to stay NULL-safe.
const conflicting = await db.oneOrNone<{ id: string }>(
`SELECT ip.id
FROM "insightsProjects" ip
WHERE ip.name = $(name)
AND ip."deletedAt" IS NULL
AND ip.id <> $(id)`,
{ name: project.name, id: exists.id },
)
if (conflicting) return true

// Slug is intentionally not updated — it is a stable identifier referenced by FK from
// securityInsightsEvaluations and related tables.
// logoUrl won't be updated in InsightsProject until we confirm that the format is
// compatible with the Insights Squared standard. Do NOT reintroduce it as a
// `--`-commented SQL line: pg-promise scans placeholders textually and would still
// require the `logoUrl` param, triggering "Property 'logoUrl' doesn't exist".
try {
await db.none(
`UPDATE "insightsProjects"
SET name = $(name),
description = $(description),
"updatedAt" = NOW()
WHERE "segmentId" = $(segmentId)
AND "deletedAt" IS NULL`,
{ segmentId, name: project.name, description: project.description },
)
} catch (err) {
if (isDuplicateKeyError(err)) return true
throw err
}
return false
}

// INSERT for the subproject segment only (the matched leaf).
// Before inserting, check if a same-family sibling (group/project level sharing the same
// sourceId) already holds this name. Shallow hierarchies (eff=1/2) have group+project+subproject
// all sharing the same name and sourceId — the group/project rows are written first and would
// cause a name conflict on the subproject INSERT. Skip the INSERT in that case; the family is
// already represented.
// INSERT path. Two guards before writing:
//
// 1. Same-family skip: a group/project-level segment sharing this sourceId already holds the
// canonical name (shallow eff=1/2 hierarchy). The family is already represented — skip the
// INSERT without recording a conflict.
const sameFamilyNameHolder = await db.oneOrNone(
`SELECT 1
FROM "insightsProjects" ip
Expand All @@ -551,22 +599,34 @@ async function upsertInsightsProject(
)
if (sameFamilyNameHolder) return false

const exists = await db.oneOrNone<{ id: string }>(
`SELECT id FROM "insightsProjects" WHERE "segmentId" = $(segmentId) AND "deletedAt" IS NULL`,
{ segmentId },
// 2. Any remaining active row with this name is a conflict — cross-family, different PCC
// project, or a NULL-segmentId orphan row. The unique index is global and includes those.
// No join needed here: sameFamilyNameHolder above already cleared the same-sourceId case,
// so anything found now is genuinely incompatible.
const conflicting = await db.oneOrNone<{ id: string }>(
`SELECT id FROM "insightsProjects" WHERE name = $(name) AND "deletedAt" IS NULL LIMIT 1`,
{ name: project.name },
)
if (exists) return false
if (conflicting) return true

// logoUrl intentionally omitted from the INSERT column list — see note above.
await db.none(
`INSERT INTO "insightsProjects" (name, slug, description, "segmentId", "isLF")
VALUES ($(name), generate_slug('insightsProjects', $(name)), $(description), $(segmentId), TRUE)`,
{ name: project.name, description: project.description, segmentId },
)

try {
await db.none(
`INSERT INTO "insightsProjects" (name, slug, description, "segmentId", "isLF")
VALUES ($(name), generate_slug('insightsProjects', $(name)), $(description), $(segmentId), TRUE)`,
{ name: project.name, description: project.description, segmentId },
)
} catch (err) {
if (isDuplicateKeyError(err)) return true
throw err
}
return false
}

function isDuplicateKeyError(err: unknown): boolean {
return err instanceof Error && 'code' in err && (err as { code: unknown }).code === '23505'
}

async function insertSyncError(
db: DbConnOrTx,
externalProjectId: string | null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,10 @@ export class TransformerConsumer {

log.info({ jobId: job.id, ...processingMetrics }, 'Job completed')
} catch (err) {
const errorMessage = err instanceof Error ? err.message : String(err)
log.error({ jobId: job.id, err }, 'Job failed')

try {
await this.metadataStore.markFailed(job.id, errorMessage, {
await this.metadataStore.markFailed(job.id, err, {
processingDurationMs: Date.now() - startTime,
})
} catch (updateErr) {
Expand Down
30 changes: 28 additions & 2 deletions services/libs/snowflake/src/metadataStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,14 @@ export class MetadataStore {
)
}

async markFailed(jobId: number, error: string, metrics?: Partial<JobMetrics>): Promise<void> {
async markFailed(jobId: number, error: unknown, metrics?: Partial<JobMetrics>): Promise<void> {
await this.db.none(
`UPDATE integration."snowflakeExportJobs"
SET error = $(error), "completedAt" = NOW(),
metrics = COALESCE(metrics, '{}'::jsonb) || COALESCE($(metrics)::jsonb, '{}'::jsonb),
"updatedAt" = NOW()
WHERE id = $(jobId)`,
{ jobId, error, metrics: metrics ? JSON.stringify(metrics) : null },
{ jobId, error: serializeJobError(error), metrics: metrics ? JSON.stringify(metrics) : null },
)
}

Expand All @@ -202,6 +202,32 @@ export class MetadataStore {
}
}

function serializeJobError(err: unknown): string {
try {
if (err instanceof Error) {
const obj: Record<string, unknown> = {
message: err.message,
name: err.name,
stack: err.stack,
}
// cause is non-enumerable on Error instances — capture it explicitly
const cause = (err as { cause?: unknown }).cause
if (cause !== undefined) {
obj.cause = cause instanceof Error ? cause.message : cause
}
for (const key of Object.keys(err)) {
if (!(key in obj)) obj[key] = (err as unknown as Record<string, unknown>)[key]
}
return JSON.stringify(obj)
}
// Non-Error throwables: preserve all fields if it's an object, otherwise wrap in {message}
const payload = typeof err === 'object' && err !== null ? err : { message: String(err) }
return JSON.stringify(payload)
} catch {
Comment thread
themarolt marked this conversation as resolved.
return JSON.stringify({ message: String(err) })
}
}

function mapRowToJob(row: {
id: number
platform: string
Expand Down
Loading