-
Notifications
You must be signed in to change notification settings - Fork 14
refactor: sheets backfill #8650
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…Spaces types This update introduces the oauthStale field to the IntegrationGoogle and IntegrationGrowthSpaces types in the GraphQL schema, indicating whether OAuth credentials are stale. Additionally, the field is integrated into the relevant resolvers and service logic to manage OAuth token refresh states effectively.
This update introduces the StaleOAuthError class to manage scenarios where Google integration OAuth credentials are stale, requiring re-authorization. The error is integrated into the getTeamGoogleAccessToken and getIntegrationGoogleAccessToken functions, preventing token refresh attempts when the integration is marked as stale. Additionally, the appendEventToGoogleSheets function is updated to skip syncing silently when a StaleOAuthError is encountered.
…stale-google-integration
…logic This update removes the StaleOAuthError class and its associated handling from the Google authentication functions. The logic for marking integrations as stale and throwing errors has been eliminated, simplifying the getTeamGoogleAccessToken and getIntegrationGoogleAccessToken functions. The appendEventToGoogleSheets function has also been adjusted to directly retrieve the access token without handling stale integration scenarios.
…upport This update introduces the exportOrder property to journey blocks and modifies the export logic to utilize blockId for data mapping. The changes ensure that data is aligned correctly in exports, preserving the order of columns based on exportOrder. Additionally, the implementation includes updates to the CSV stringifier to handle blockId-based column matching, improving the overall export process for Google Sheets and CSV files.
…multiple schemas This commit removes the oauthStale field from the IntegrationGoogle type in various GraphQL schema files, including api-gateway, api-journeys, and api-journeys-modern. The change ensures consistency across the integration types and simplifies the integration model.
This commit enhances the journey visitor export functionality by building a lookup map from the row data, ensuring that values are aligned correctly with the final header order. This change simplifies the process of matching column values and improves the overall export process for Google Sheets.
This commit introduces error logging for the block export order updates in the journeyVisitorExportToGoogleSheet mutation. If an error occurs during the update process, it logs the error details without rethrowing, ensuring that the mutation can still return success after the sheet export. This enhances the robustness of the export functionality.
…tion This update introduces a job queue for syncing events to Google Sheets, ensuring that events are processed sequentially to avoid race conditions. The appendEventToGoogleSheets function has been modified to fetch sync configurations upfront and add jobs to the queue with retry logic. Additionally, the server's run function now supports concurrency settings, specifically set to 1 for the Google Sheets sync worker to maintain data integrity during processing.
…' into 00-00-MA-refactor-sheets-bull-queue
This commit refactors the event utility tests to incorporate a new Google Sheets sync queue. It replaces the previous email queue mock with a dedicated Google Sheets sync queue and updates the test cases to ensure proper setup and assertions for both email and Google Sheets sync functionalities. Additionally, it introduces checks for null queue scenarios to enhance robustness in the testing framework.
…leSheet mutation This commit refactors the column mapping logic in the journeyVisitorExportToGoogleSheet mutation by removing the unnecessary index parameter from the map function. This change enhances code readability and maintains the functionality of identifying column positions for export.
…com/JesusFilm/core into 00-00-MA-refactor-sheets-bull-queue
This commit introduces a new clearSheet function that allows for clearing all data from a specified Google Sheets tab while preserving the sheet itself. This functionality is particularly useful for backfill operations that require replacing existing content. Additionally, the Google Sheets sync job types have been expanded to include backfill and create operations, enhancing the overall sync capabilities.
WalkthroughMoves Google Sheets syncing to a queue-driven system: adds BullMQ job types and queue, worker services (create/backfill/append), GraphQL mutation for backfill, resolver changes to enqueue jobs, a sheets.clearSheet helper, and test helpers to inject mock queues. Tests updated to expect queue interactions. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Resolver as Export Mutation
participant DB as Database
participant Queue as GoogleSheets Sync Queue
participant Worker as Create/Backfill Worker
participant Sheets as Google Sheets API
rect rgba(150, 200, 255, 0.5)
Client->>Resolver: journeyVisitorExportToGoogleSheet(...)
Resolver->>DB: validate integration & create/find spreadsheet
Resolver->>DB: create GoogleSheetsSync record
Resolver->>Queue: enqueue job (type: create/backfill)
Resolver-->>Client: return
end
Queue->>Worker: deliver job
Worker->>DB: load journey, visitors, headers
Worker->>Sheets: ensure sheet exists
Worker->>Sheets: clearSheet(...) (values:clear)
Worker->>Sheets: write header + batched rows
Worker->>DB: update block exportOrder (if needed)
Worker-->>Queue: complete job
sequenceDiagram
participant EventProducer as Event Mutation
participant AppendFn as appendEventToGoogleSheets
participant DB as Database
participant Queue as GoogleSheets Sync Queue
participant AppendWorker as Append Service
participant Sheets as Google Sheets API
EventProducer->>AppendFn: appendEventToGoogleSheets(row)
AppendFn->>DB: fetch active syncs
alt syncs exist
AppendFn->>Queue: enqueue append job (row + syncs)
AppendFn-->>EventProducer: return
Queue->>AppendWorker: process append job
AppendWorker->>Sheets: append row to sheet (merge/dedupe)
else no syncs
AppendFn-->>EventProducer: return (no-op)
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
View your CI Pipeline Execution ↗ for commit fe51c56
☁️ Nx Cloud last updated this comment at |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
apis/api-journeys-modern/src/schema/journeyVisitor/journeyVisitorExportToGoogleSheet.mutation.ts (1)
202-217: Consider checking deletedAt in existing sync query.The query for
existingSyncdoesn't filter bydeletedAt: null. This means a soft-deleted sync could block creation of a new sync for the same journey/spreadsheet/sheet combination.🐛 Suggested fix
const existingSync = await prisma.googleSheetsSync.findFirst({ where: { teamId: journey.teamId, journeyId, spreadsheetId, - sheetName + sheetName, + deletedAt: null } })
🤖 Fix all issues with AI agents
In `@apis/api-journeys-modern/src/workers/googleSheetsSync/queue.ts`:
- Around line 71-93: Change the three type-guard functions isAppendJob,
isBackfillJob, and isCreateJob to accept data: unknown and perform safe runtime
checks before using the 'in' operator or reading .type; specifically, first
ensure data is a non-null object (typeof data === 'object' && data !== null),
cast to a Record<string, unknown> (e.g., const obj = data as Record<string,
unknown>), then use 'type' in obj and check that obj.type is a string when
comparing to 'append'/'backfill'/'create'; for isAppendJob preserve legacy
behavior by returning true when obj lacks a 'type' property, otherwise compare
obj.type === 'append'.
🧹 Nitpick comments (9)
apis/api-journeys-modern/src/workers/server.ts (1)
28-38: Consider consolidating the concurrency validation logic.The validation condition is duplicated: once for logging (lines 28) and again for constructing
workerOptions(lines 36). Consider extracting to a validated variable to follow DRY:♻️ Suggested refactor
- if (concurrency != null && !(Number.isFinite(concurrency) && concurrency > 0)) { + const validConcurrency = + typeof concurrency === 'number' && + Number.isFinite(concurrency) && + concurrency > 0 + ? concurrency + : undefined + + if (concurrency != null && validConcurrency == null) { logger.warn( { queue: queueName, concurrency }, 'invalid concurrency; using default' ) } - const workerOptions: ConstructorParameters<typeof Worker>[2] = - typeof concurrency === 'number' && Number.isFinite(concurrency) && concurrency > 0 - ? { connection, concurrency } - : { connection } + const workerOptions: ConstructorParameters<typeof Worker>[2] = + validConcurrency != null + ? { connection, concurrency: validConcurrency } + : { connection }apis/api-journeys-modern/src/schema/googleSheetsSync/googleSheetsSyncBackfill.mutation.ts (1)
97-121: Consider logging when the queue is unavailable.When
googleSheetsSyncQueueis null, the mutation succeeds silently without actually triggering a backfill. While this fail-safe pattern is consistent with other queue operations, users may be confused when nothing happens.Consider adding a log or returning a different status when the queue is unavailable to aid debugging in production scenarios.
💡 Optional: Add logging for null queue
// Enqueue the backfill job if (googleSheetsSyncQueue != null) { await googleSheetsSyncQueue.add( 'google-sheets-sync-backfill', // ... job data ) + } else { + console.warn('Google Sheets sync queue unavailable - backfill not enqueued') }apis/api-journeys-modern/src/workers/googleSheetsSync/service/service.ts (3)
40-50: Consider extracting shared constant to reduce duplication.The
journeyBlockSelectconstant is duplicated acrossservice.ts,backfill.ts, andcreate.ts. Consider extracting it to a shared module to ensure consistency and reduce maintenance burden.
242-243: Unsafe type assertion withas any.Casting
idToBlockandjourneyBlockstoanybypasses type safety. Consider defining proper types that match the expected interface ofgetCardHeadingto maintain type safety.♻️ Suggested approach
Define a type that satisfies the
getCardHeadingfunction signature instead of usingas any:// If getCardHeading expects BlockLike[], ensure journeyBlocks conform to that type // or create a mapping function that transforms the data appropriately
298-312: Potential performance concern with large datasets.Reading up to 1 million rows (
A${firstDataRow}:A1000000) to find a visitor ID could be slow for sheets with many rows. Consider:
- Using a more bounded range based on expected data size
- Implementing binary search if data is sorted
- Caching visitor row indices
apis/api-journeys-modern/src/workers/googleSheetsSync/service/backfill.ts (1)
43-129: Significant code duplication with create.ts.The
getJourneyVisitorsgenerator and the data row building logic (lines 284-304) are nearly identical tocreate.ts. Consider extracting these to a shared utility module to improve maintainability.♻️ Suggested approach
Create a shared module like
workers/googleSheetsSync/service/shared.ts:// shared.ts export interface JourneyVisitorExportRow { visitorId: string date: string [key: string]: string } export async function* getJourneyVisitors( journeyId: string, eventWhere: Prisma.EventWhereInput, timezone: string, batchSize: number = 1000 ): AsyncGenerator<JourneyVisitorExportRow> { // ... shared implementation }Also applies to: 284-304
apis/api-journeys-modern/src/workers/googleSheetsSync/service/create.ts (1)
307-335: Potential issue with exportOrder calculation.The exportOrder calculation subtracts
baseColumns.lengthbut usesfindIndexwhich could return the same index for blocks with duplicate blockIds if deduplication wasn't perfect. Additionally,col.exportOrder == nullcheck relies on the column having this field populated from the block, but new columns won't have it.Also,
Promise.allfor individual block updates could be inefficient for many blocks. Consider using a transaction or batch update.♻️ Suggested improvement
if (blocksToUpdate.length > 0) { try { - await Promise.all( - blocksToUpdate.map(({ blockId, exportOrder }) => - prisma.block.update({ - where: { id: blockId }, - data: { exportOrder } - }) - ) - ) + await prisma.$transaction( + blocksToUpdate.map(({ blockId, exportOrder }) => + prisma.block.update({ + where: { id: blockId }, + data: { exportOrder } + }) + ) + ) } catch (error) {apis/api-journeys-modern/src/schema/journeyVisitor/journeyVisitorExportToGoogleSheet.mutation.ts (2)
23-29: Inconsistent test helper naming.This test helper is named
__setGoogleSheetsSyncQueueForExportTestswhileutils.tsuses__setGoogleSheetsSyncQueueForTests. Consider using a consistent naming convention across the codebase.♻️ Suggested fix
-export function __setGoogleSheetsSyncQueueForExportTests( +export function __setGoogleSheetsSyncQueueForTests( mockQueue: any ): void { googleSheetsSyncQueue = mockQueue }Alternatively, if both are intentionally separate (different module-level queue variables), ensure the distinction is documented.
31-31: Duplicate constant - consider centralizing.
ONE_HOURis defined identically inutils.ts(line 49). Consider extracting to a shared constants module to avoid duplication.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In
`@apis/api-journeys-modern/src/schema/journeyVisitor/journeyVisitorExportToGoogleSheet.mutation.ts`:
- Around line 231-258: The created google-sheets sync record
(prisma.googleSheetsSync.create with syncData producing sync.id) can be left
orphaned if googleSheetsSyncQueue.add throws; change the flow to ensure
atomicity by either wrapping the record creation and queue.add in a single
transaction where possible, or by catching errors from googleSheetsSyncQueue.add
and deleting the newly created record (prisma.googleSheetsSync.delete({ where: {
id: sync.id } })) on failure; ensure you still surface the original error after
cleanup and preserve existing backoff/attempt/removeOnComplete options used when
calling googleSheetsSyncQueue.add.
In `@apis/api-journeys-modern/src/workers/googleSheetsSync/service/create.ts`:
- Around line 286-307: The code buffers all rows into the values array
(initialized with sanitizedHeaderRow) while iterating getJourneyVisitors, which
can OOM for large journeys; change create.ts to write the header once via
writeValues or writeValues({..., values: [sanitizedHeaderRow], append: false })
and then stream visitor rows in fixed-size batches: accumulate aligned rows
(using finalHeader and sanitizeGoogleSheetsCell) into a small batch array and
call writeValues with append: true for each batch, flushing the batch and
continuing until getJourneyVisitors completes; keep references to
getJourneyVisitors, sanitizedHeaderRow, finalHeader, values, and writeValues to
locate the changes.
- Around line 223-237: The current dedupe uses only blockId and drops additional
labels; update the Map keying logic in the header normalization block (where
headerMap and normalizedBlockHeaders are constructed from blockHeadersResult) to
use a composite key combining blockId and the normalizedLabel (e.g.,
`${blockId}-${normalizedLabel}`) so each distinct label per block becomes its
own entry; ensure you still normalize whitespace via the existing
normalizedLabel variable and keep the Map value shape { blockId, label } so
downstream row keys (which use blockId-label) align with these columns.
🧹 Nitpick comments (4)
apis/api-journeys-modern/src/schema/journeyVisitor/journeyVisitorExportToGoogleSheet.mutation.ts (2)
11-21: Silent catch block swallows import errors.If the queue import fails for reasons other than being in test mode (e.g., misconfiguration, missing dependency), this will silently set the queue to
nullwithout any diagnostic output. Consider logging the error.♻️ Suggested improvement
} catch { + logger.warn('Failed to load googleSheetsSync queue, sync jobs will not be enqueued') googleSheetsSyncQueue = null }
23-27: Consider adding a type for the mock queue parameter.Using
anyreduces type safety. A minimal interface would help catch test issues earlier.♻️ Optional type improvement
+interface QueueLike { + add: (name: string, data: unknown, opts?: unknown) => Promise<unknown> +} + // Test helper to inject a mock queue // eslint-disable-next-line `@typescript-eslint/naming-convention` -export function __setGoogleSheetsSyncQueueForExportTests(mockQueue: any): void { +export function __setGoogleSheetsSyncQueueForExportTests(mockQueue: QueueLike | null): void { googleSheetsSyncQueue = mockQueue }apis/api-journeys-modern/src/workers/server.ts (1)
28-43: Duplicate concurrency validation logic.The concurrency value is validated twice: once for logging a warning (lines 28-36) and again when constructing
workerOptions(lines 39-41). Consider consolidating to avoid redundancy.♻️ Consolidated validation
+ const isValidConcurrency = + typeof concurrency === 'number' && + Number.isFinite(concurrency) && + concurrency > 0 + if ( - concurrency != null && - !(Number.isFinite(concurrency) && concurrency > 0) + concurrency != null && !isValidConcurrency ) { logger.warn( { queue: queueName, concurrency }, 'invalid concurrency; using default' ) } const workerOptions: ConstructorParameters<typeof Worker>[2] = - typeof concurrency === 'number' && - Number.isFinite(concurrency) && - concurrency > 0 + isValidConcurrency ? { connection, concurrency } : { connection }apis/api-journeys-modern/src/workers/googleSheetsSync/service/create.ts (1)
43-124: Consider cursor-based pagination to avoid skip/offset drift and scale pain.
skip+orderBycan be slow on large tables and can miss/duplicate rows if new visitors/events arrive during the export. A cursor on(createdAt, id)keeps pagination stable and efficient.
.../api-journeys-modern/src/schema/journeyVisitor/journeyVisitorExportToGoogleSheet.mutation.ts
Show resolved
Hide resolved
apis/api-journeys-modern/src/workers/googleSheetsSync/service/create.ts
Outdated
Show resolved
Hide resolved
…mezone and simplify event handling This commit modifies the journeyVisitorExportToGoogleSheet mutation to accept a timezone parameter, allowing for better handling of date and time during exports. Additionally, it removes unused filter and select arguments, streamlining the mutation's functionality. The test cases have been updated accordingly to reflect these changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
apis/api-journeys-modern/src/workers/googleSheetsSync/service/service.ts (1)
48-51: Guard against missingsyncsfor legacy jobs.If a queued legacy job lacks
syncs,syncs.lengthwill throw and the job will fail. Add a safe check to tolerate undefined payloads.Suggested fix
- if (syncs.length === 0) return + if (!syncs?.length) return
🤖 Fix all issues with AI agents
In
`@apis/api-journeys-modern/src/schema/journeyVisitor/journeyVisitorExportToGoogleSheet.mutation.ts`:
- Around line 14-33: Change the untyped any usage to a type-only import for the
queue: define a type alias GoogleSheetsSyncQueue = typeof
import('../../workers/googleSheetsSync/queue').queue and declare
googleSheetsSyncQueue as GoogleSheetsSyncQueue | null (initialize to null)
instead of any; update the test helper __setGoogleSheetsSyncQueueForExportTests
to accept mockQueue: GoogleSheetsSyncQueue | null so callers and usages of
googleSheetsSyncQueue.add() get proper compile-time payload/type safety while
preserving the lazy runtime require behavior.
.../api-journeys-modern/src/schema/journeyVisitor/journeyVisitorExportToGoogleSheet.mutation.ts
Show resolved
Hide resolved
|
I see you added the "on stage" label, I'll get this merged to the stage branch! |
|
Merge conflict attempting to merge this into stage. Please fix manually. |
This update enhances the logic for appending values to Google Sheets by ensuring non-unique values are not duplicated. If the existing value is empty, the new value is added; otherwise, values are joined with a semicolon, preventing duplicates from being appended.
|
Merge conflict attempting to merge this into stage. Please fix manually. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In
`@apis/api-journeys-modern/src/schema/journeyVisitor/export/googleSheetsLiveSync.ts`:
- Around line 342-353: Normalize tokens before deduplication in the append
logic: treat newValue and existing tokens by trimming and removing any leading
sheet-escaping apostrophe before comparing, and short-circuit when newValue is
empty or whitespace-only; in the block that builds existingTokens and checks
includes (references: existingValue, newValue, existingTokens), map tokens
through a normalizer (trim + strip leading "'"), normalize newValue the same
way, skip appending if normalized newValue is empty or already present,
otherwise append the original newValue to preserve display formatting.
apis/api-journeys-modern/src/schema/journeyVisitor/export/googleSheetsLiveSync.ts
Show resolved
Hide resolved
This update introduces a normalization function to trim and strip leading apostrophes from values before appending them to Google Sheets. It ensures that empty or whitespace-only values are not appended, improving data integrity and consistency.
Summary by CodeRabbit
New Features
Changes
✏️ Tip: You can customize this high-level summary in your review settings.