From 86aa92a1bdfc112e63bfc1f5fe671d37b572dec1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gas=CC=8Cper=20Zgonec?= Date: Thu, 23 Apr 2026 09:18:49 +0200 Subject: [PATCH 1/2] Added supports for pagination, control protocol v2 and selective extraction. --- code/.env.example | 2 + .../fixtures/start_extracting_data/event.json | 16 ++ .../extraction_scope.json | 11 + .../fixtures/start_extracting_data/state.json | 5 + .../event.json | 16 ++ .../extraction_scope.json | 5 + .../state.json | 5 + .../event.json | 16 ++ .../extraction_scope.json | 5 + .../state.json | 5 + .../event.json | 12 + code/fixtures/start_loading_data/event.json | 14 + code/fixtures/start_loading_data/state.json | 1 + code/package-lock.json | 8 +- code/package.json | 2 +- code/scripts/deploy.sh | 49 ---- .../external-system/data-denormalization.ts | 17 +- .../functions/external-system/http-client.ts | 244 +++++++++++------- .../extraction/workers/data-extraction.ts | 78 ++++-- .../loading/workers/load-attachments.ts | 9 +- code/src/main.ts | 8 +- code/src/test-runner/test-runner.ts | 160 +++++++++--- manifest.yaml | 3 +- 23 files changed, 485 insertions(+), 206 deletions(-) create mode 100644 code/.env.example create mode 100644 code/fixtures/start_extracting_data/event.json create mode 100644 code/fixtures/start_extracting_data/extraction_scope.json create mode 100644 code/fixtures/start_extracting_data/state.json create mode 100644 code/fixtures/start_extracting_data_selective/event.json create mode 100644 code/fixtures/start_extracting_data_selective/extraction_scope.json create mode 100644 code/fixtures/start_extracting_data_selective/state.json create mode 100644 code/fixtures/start_extracting_data_with_timestamps/event.json create mode 100644 code/fixtures/start_extracting_data_with_timestamps/extraction_scope.json create mode 100644 code/fixtures/start_extracting_data_with_timestamps/state.json create mode 100644 code/fixtures/start_extracting_external_sync_units/event.json create mode 100644 code/fixtures/start_loading_data/event.json create mode 100644 code/fixtures/start_loading_data/state.json delete mode 100755 code/scripts/deploy.sh diff --git a/code/.env.example b/code/.env.example new file mode 100644 index 0000000..bb4d54c --- /dev/null +++ b/code/.env.example @@ -0,0 +1,2 @@ +# TODO: add External system specific variables that will be replaced in the Fixture tests that you want to run. +TODO_API_KEY=test-api-key diff --git a/code/fixtures/start_extracting_data/event.json b/code/fixtures/start_extracting_data/event.json new file mode 100644 index 0000000..d1cb3aa --- /dev/null +++ b/code/fixtures/start_extracting_data/event.json @@ -0,0 +1,16 @@ +{ + "payload": { + "connection_data": { + "org_id": "test_org", + "org_name": "Test Organization", + "key": "${TODO_API_KEY}", + "key_type": "pat" + }, + "event_type": "START_EXTRACTING_DATA", + "event_context": { + "external_sync_unit_id": "test_external_sync_unit_id", + "extract_from": "1970-01-01T00:00:00.000Z", + "extract_to": "2099-12-31T23:59:59.999Z" + } + } +} diff --git a/code/fixtures/start_extracting_data/extraction_scope.json b/code/fixtures/start_extracting_data/extraction_scope.json new file mode 100644 index 0000000..d679c3b --- /dev/null +++ b/code/fixtures/start_extracting_data/extraction_scope.json @@ -0,0 +1,11 @@ +{ + "todos": { + "extract": true + }, + "users": { + "extract": true + }, + "attachments": { + "extract": true + } +} diff --git a/code/fixtures/start_extracting_data/state.json b/code/fixtures/start_extracting_data/state.json new file mode 100644 index 0000000..3d5b4c0 --- /dev/null +++ b/code/fixtures/start_extracting_data/state.json @@ -0,0 +1,5 @@ +{ + "todos": { "completed": false }, + "users": { "completed": false }, + "attachments": { "completed": false } +} diff --git a/code/fixtures/start_extracting_data_selective/event.json b/code/fixtures/start_extracting_data_selective/event.json new file mode 100644 index 0000000..5fac93a --- /dev/null +++ b/code/fixtures/start_extracting_data_selective/event.json @@ -0,0 +1,16 @@ +{ + "payload": { + "connection_data": { + "org_id": "test_org", + "org_name": "Test Organization", + "key": "test-api-key", + "key_type": "pat" + }, + "event_type": "START_EXTRACTING_DATA", + "event_context": { + "external_sync_unit_id": "test_external_sync_unit_id", + "extract_from": "1970-01-01T00:00:00.000Z", + "extract_to": "2099-12-31T23:59:59.999Z" + } + } +} diff --git a/code/fixtures/start_extracting_data_selective/extraction_scope.json b/code/fixtures/start_extracting_data_selective/extraction_scope.json new file mode 100644 index 0000000..f4d21c6 --- /dev/null +++ b/code/fixtures/start_extracting_data_selective/extraction_scope.json @@ -0,0 +1,5 @@ +{ + "todos": { "extract": true }, + "users": { "extract": false }, + "attachments": { "extract": false } +} diff --git a/code/fixtures/start_extracting_data_selective/state.json b/code/fixtures/start_extracting_data_selective/state.json new file mode 100644 index 0000000..3d5b4c0 --- /dev/null +++ b/code/fixtures/start_extracting_data_selective/state.json @@ -0,0 +1,5 @@ +{ + "todos": { "completed": false }, + "users": { "completed": false }, + "attachments": { "completed": false } +} diff --git a/code/fixtures/start_extracting_data_with_timestamps/event.json b/code/fixtures/start_extracting_data_with_timestamps/event.json new file mode 100644 index 0000000..9d13ae2 --- /dev/null +++ b/code/fixtures/start_extracting_data_with_timestamps/event.json @@ -0,0 +1,16 @@ +{ + "payload": { + "connection_data": { + "org_id": "test_org", + "org_name": "Test Organization", + "key": "test-api-key", + "key_type": "pat" + }, + "event_type": "START_EXTRACTING_DATA", + "event_context": { + "external_sync_unit_id": "test_external_sync_unit_id", + "extract_from": "2024-01-01T00:00:00.000Z", + "extract_to": "2026-04-14T00:00:00.000Z" + } + } +} diff --git a/code/fixtures/start_extracting_data_with_timestamps/extraction_scope.json b/code/fixtures/start_extracting_data_with_timestamps/extraction_scope.json new file mode 100644 index 0000000..3562132 --- /dev/null +++ b/code/fixtures/start_extracting_data_with_timestamps/extraction_scope.json @@ -0,0 +1,5 @@ +{ + "todos": { "extract": true }, + "users": { "extract": true }, + "attachments": { "extract": true } +} diff --git a/code/fixtures/start_extracting_data_with_timestamps/state.json b/code/fixtures/start_extracting_data_with_timestamps/state.json new file mode 100644 index 0000000..3d5b4c0 --- /dev/null +++ b/code/fixtures/start_extracting_data_with_timestamps/state.json @@ -0,0 +1,5 @@ +{ + "todos": { "completed": false }, + "users": { "completed": false }, + "attachments": { "completed": false } +} diff --git a/code/fixtures/start_extracting_external_sync_units/event.json b/code/fixtures/start_extracting_external_sync_units/event.json new file mode 100644 index 0000000..7a9e79e --- /dev/null +++ b/code/fixtures/start_extracting_external_sync_units/event.json @@ -0,0 +1,12 @@ +{ + "function_name": "extraction", + "payload": { + "event_type": "START_EXTRACTING_EXTERNAL_SYNC_UNITS" + }, + "connection_data": { + "org_id": "test_org", + "org_name": "Test Organization", + "key": "test-api-key", + "key_type": "pat" + } +} diff --git a/code/fixtures/start_loading_data/event.json b/code/fixtures/start_loading_data/event.json new file mode 100644 index 0000000..0b23b9f --- /dev/null +++ b/code/fixtures/start_loading_data/event.json @@ -0,0 +1,14 @@ +{ + "payload": { + "connection_data": { + "org_id": "test_org", + "org_name": "Test Organization", + "key": "test-api-key", + "key_type": "pat" + }, + "event_type": "START_LOADING_DATA", + "event_context": { + "external_sync_unit_id": "test_external_sync_unit_id" + } + } +} diff --git a/code/fixtures/start_loading_data/state.json b/code/fixtures/start_loading_data/state.json new file mode 100644 index 0000000..0967ef4 --- /dev/null +++ b/code/fixtures/start_loading_data/state.json @@ -0,0 +1 @@ +{} diff --git a/code/package-lock.json b/code/package-lock.json index 790fc88..998853b 100644 --- a/code/package-lock.json +++ b/code/package-lock.json @@ -9,7 +9,7 @@ "version": "1.1.6", "license": "ISC", "dependencies": { - "@devrev/ts-adaas": "1.18.0", + "@devrev/ts-adaas": "1.19.1-beta.0", "@devrev/typescript-sdk": "1.1.63", "axios": "^1.9.0", "dotenv": "^16.0.3", @@ -1874,9 +1874,9 @@ } }, "node_modules/@devrev/ts-adaas": { - "version": "1.18.0", - "resolved": "https://registry.npmjs.org/@devrev/ts-adaas/-/ts-adaas-1.18.0.tgz", - "integrity": "sha512-LMV7AGWi5hKbhuGHdBW2+mGJGn4OpdKPP1xzKGc13gTFC3KWuqADuuYifRE/QjPslkA6+RuX0Hx4vxeHkp/uqg==", + "version": "1.19.1-beta.0", + "resolved": "https://registry.npmjs.org/@devrev/ts-adaas/-/ts-adaas-1.19.1-beta.0.tgz", + "integrity": "sha512-nEsMk1jtlgpz4l/YnAjVFL/FrvBwumAC0CpRVhIEMJpyAF6y07x+Yz1edinfC/v7lGRGzp94VnvLxfDimi0JIA==", "license": "ISC", "dependencies": { "@devrev/typescript-sdk": "^1.1.59", diff --git a/code/package.json b/code/package.json index a4655b8..5ff19e7 100644 --- a/code/package.json +++ b/code/package.json @@ -56,7 +56,7 @@ "yargs": "^17.6.2" }, "dependencies": { - "@devrev/ts-adaas": "1.18.0", + "@devrev/ts-adaas": "1.19.1-beta.0", "@devrev/typescript-sdk": "1.1.63", "axios": "^1.9.0", "dotenv": "^16.0.3", diff --git a/code/scripts/deploy.sh b/code/scripts/deploy.sh deleted file mode 100755 index 08eac7e..0000000 --- a/code/scripts/deploy.sh +++ /dev/null @@ -1,49 +0,0 @@ -#!/usr/bin/env bash - -echo "Creating Snap-in version..." - -# Create a new snap-in version -VER_OUTPUT=$(devrev snap_in_version create-one \ - --path "." \ - --create-package | tee /dev/tty) - -# Filter the output to get the snap-in version ID -FILTERED_OUTPUT=$(grep "snap_in_version" <<<"$VER_OUTPUT" | grep -o '{.*}') - -# Check if DevRev CLI returned an error (error messages contain the field 'message') -if ! jq '.message' <<<"$FILTERED_OUTPUT" | grep null >/dev/null; then - exit 1 -fi - -# Get the snap-in version ID -VERSION_ID=$(jq -r '.snap_in_version.id' <<<"$FILTERED_OUTPUT") - -echo "Waiting 10 seconds for Snap-in version to be ready..." -sleep 10 - -# Wait for the snap-in version to be ready -while :; do - VER_OUTPUT2=$(devrev snap_in_version show "$VERSION_ID") - STATE=$(jq -r '.snap_in_version.state' <<<"$VER_OUTPUT2") - if [[ "$STATE" == "build_failed" ]] || [[ "$STATE" == "deployment_failed" ]]; then - echo "Snap-in version build/deployment failed: $(jq -r '.snap_in_version.failure_reason' <<<"$VER_OUTPUT2")" - exit 1 - elif [[ "$STATE" == "ready" ]]; then - break - else - echo "Snap-in version's state is $STATE, waiting 10 seconds..." - sleep 10 - fi -done - -echo "Creating Snap-in draft..." - -# Create a new snap-in draft -DRAFT_OUTPUT=$(devrev snap_in draft --snap_in_version "$VERSION_ID") -jq <<<"$DRAFT_OUTPUT" -echo "Snap-in draft created. Please go to the Snap-ins page in the DevRev UI to complete the installation process." - -# Check if DevRev CLI returned an error (error messages contain the field 'message') -if ! jq '.message' <<<"$DRAFT_OUTPUT" | grep null >/dev/null; then - exit 1 -fi diff --git a/code/src/functions/external-system/data-denormalization.ts b/code/src/functions/external-system/data-denormalization.ts index 3c7597e..c6cb7d5 100644 --- a/code/src/functions/external-system/data-denormalization.ts +++ b/code/src/functions/external-system/data-denormalization.ts @@ -1,5 +1,5 @@ -import { ExternalSystemItem } from '@devrev/ts-adaas'; -import { ExternalTodo } from './types'; +import { ExternalSystemAttachment, ExternalSystemItem } from '@devrev/ts-adaas'; +import { ExternalAttachment, ExternalTodo } from './types'; // TODO: Replace with the actual denormalization function for your external // system. This function should take the normalized object and transform it into @@ -15,3 +15,16 @@ export function denormalizeTodo(item: ExternalSystemItem): ExternalTodo { modified_date: item.modified_date, }; } + +// TODO: Replace with the actual denormalization function for attachments. +// Maps the normalized ExternalSystemAttachment (as received from the loading +// pipeline) back into the format expected by the external system API. +export function denormalizeAttachment(item: ExternalSystemAttachment): ExternalAttachment { + return { + id: item.reference_id, + url: item.url, + file_name: item.file_name, + author_id: item.created_by_id, + parent_id: item.parent_id ?? item.parent_reference_id, + }; +} diff --git a/code/src/functions/external-system/http-client.ts b/code/src/functions/external-system/http-client.ts index 947c49e..9fb20ba 100644 --- a/code/src/functions/external-system/http-client.ts +++ b/code/src/functions/external-system/http-client.ts @@ -1,124 +1,184 @@ import { AirdropEvent, ExternalSystemItemLoadingResponse } from '@devrev/ts-adaas'; import { ExternalAttachment, ExternalTodo, ExternalTodoList, ExternalUser } from './types'; +// --------------------------------------------------------------------------- +// In-memory data store +// +// This simulates an external system's database. In a real connector you would +// replace the methods below with actual HTTP calls to your external API. +// +// The items span a wide range of dates so that the time-window filtering +// behaviour (driven by CPv2's extract_from / extract_to) is clearly visible: +// - Some items are old (pre-2020) and will only appear in a full sync. +// - Some items are recent (2024-2026) and will appear in both full and +// narrow-window syncs. +// --------------------------------------------------------------------------- + +const USERS: ExternalUser[] = [ + { id: 'user-1', created_date: '2018-03-10T08:00:00Z', modified_date: '2018-03-10T08:00:00Z', email: 'alice@example.com', name: 'Alice Johnson' }, + { id: 'user-2', created_date: '2019-07-22T14:30:00Z', modified_date: '2019-07-22T14:30:00Z', email: 'bob@example.com', name: 'Bob Smith' }, + { id: 'user-3', created_date: '2020-11-05T09:15:00Z', modified_date: '2021-02-18T11:00:00Z', email: 'carol@example.com', name: 'Carol White' }, + { id: 'user-4', created_date: '2021-04-12T16:45:00Z', modified_date: '2022-08-30T10:20:00Z', email: 'dave@example.com', name: 'Dave Brown' }, + { id: 'user-5', created_date: '2022-09-01T07:00:00Z', modified_date: '2023-01-15T13:00:00Z', email: 'eve@example.com', name: 'Eve Davis' }, + { id: 'user-6', created_date: '2023-03-20T12:00:00Z', modified_date: '2023-06-10T09:30:00Z', email: 'frank@example.com', name: 'Frank Miller' }, + { id: 'user-7', created_date: '2023-10-01T08:00:00Z', modified_date: '2024-01-05T14:00:00Z', email: 'grace@example.com', name: 'Grace Lee' }, + { id: 'user-8', created_date: '2024-02-14T10:00:00Z', modified_date: '2024-05-20T16:00:00Z', email: 'henry@example.com', name: 'Henry Zhang' }, + { id: 'user-9', created_date: '2024-08-01T09:00:00Z', modified_date: '2025-01-10T11:00:00Z', email: 'irene@example.com', name: 'Irene Park' }, + { id: 'user-10', created_date: '2025-03-15T08:30:00Z', modified_date: '2026-01-20T10:00:00Z', email: 'jack@example.com', name: 'Jack Wilson' }, +]; + +const TODO_LISTS: ExternalTodoList[] = [ + { id: 'list-1', name: 'Work Tasks', description: 'Tasks related to work', item_count: 5, item_type: 'todos' }, + { id: 'list-2', name: 'Personal Tasks', description: 'Personal errands and goals', item_count: 5, item_type: 'todos' }, + { id: 'list-3', name: 'Project Tasks', description: 'Tasks for the current project', item_count: 5, item_type: 'todos' }, +]; + +const TODOS: ExternalTodo[] = [ + { id: 'todo-1', created_date: '2018-06-01T08:00:00Z', modified_date: '2018-06-01T08:00:00Z', title: 'Set up development environment', body: '

Install Node.js, configure editor, clone repos.

', creator: 'user-1', owner: 'user-1' }, + { id: 'todo-2', created_date: '2019-09-15T10:00:00Z', modified_date: '2019-09-15T10:00:00Z', title: 'Write project proposal', body: '

Draft the initial project proposal document.

', creator: 'user-2', owner: 'user-2' }, + { id: 'todo-3', created_date: '2020-02-20T09:00:00Z', modified_date: '2020-02-20T09:00:00Z', title: 'Design database schema', body: '

Define entities, relationships and indexes.

', creator: 'user-1', owner: 'user-3' }, + { id: 'todo-4', created_date: '2021-05-10T14:00:00Z', modified_date: '2021-05-10T14:00:00Z', title: 'Implement authentication', body: '

Add JWT-based authentication to the API.

', creator: 'user-3', owner: 'user-4' }, + { id: 'todo-5', created_date: '2022-01-18T11:30:00Z', modified_date: '2022-03-05T16:00:00Z', title: 'Add unit tests', body: '

Achieve 80% test coverage for the core module.

', creator: 'user-2', owner: 'user-5' }, + { id: 'todo-6', created_date: '2022-08-22T08:00:00Z', modified_date: '2023-02-14T09:00:00Z', title: 'Performance profiling', body: '

Profile API endpoints and fix bottlenecks.

', creator: 'user-4', owner: 'user-4' }, + { id: 'todo-7', created_date: '2023-04-05T10:00:00Z', modified_date: '2023-07-01T13:00:00Z', title: 'Integrate third-party payment provider', body: '

Connect Stripe for billing.

', creator: 'user-5', owner: 'user-6' }, + { id: 'todo-8', created_date: '2023-11-01T08:30:00Z', modified_date: '2024-02-10T10:00:00Z', title: 'Migrate to TypeScript', body: '

Convert remaining JS files to TypeScript.

', creator: 'user-6', owner: 'user-7' }, + { id: 'todo-9', created_date: '2024-03-12T09:00:00Z', modified_date: '2024-06-20T15:00:00Z', title: 'Implement dark mode', body: '

Add dark mode toggle and persist preference.

', creator: 'user-7', owner: 'user-8' }, + { id: 'todo-10', created_date: '2024-07-01T11:00:00Z', modified_date: '2024-09-30T12:00:00Z', title: 'Set up CI/CD pipeline', body: '

Configure GitHub Actions for build and deploy.

', creator: 'user-8', owner: 'user-9' }, + { id: 'todo-11', created_date: '2024-10-15T08:00:00Z', modified_date: '2025-01-08T09:30:00Z', title: 'Refactor API error handling', body: '

Standardise error responses across all endpoints.

', creator: 'user-9', owner: 'user-10' }, + { id: 'todo-12', created_date: '2025-02-01T10:00:00Z', modified_date: '2025-03-15T11:00:00Z', title: 'Add search functionality', body: '

Implement full-text search with filters.

', creator: 'user-10', owner: 'user-1' }, + { id: 'todo-13', created_date: '2025-05-20T09:00:00Z', modified_date: '2025-07-10T14:00:00Z', title: 'Optimise database queries', body: '

Add missing indexes and rewrite slow queries.

', creator: 'user-1', owner: 'user-2' }, + { id: 'todo-14', created_date: '2025-09-01T08:00:00Z', modified_date: '2025-11-20T16:30:00Z', title: 'Write API documentation', body: '

Document all public endpoints using OpenAPI spec.

', creator: 'user-2', owner: 'user-3' }, + { id: 'todo-15', created_date: '2026-01-10T09:00:00Z', modified_date: '2026-03-01T10:00:00Z', title: 'Launch v2.0', body: '

Final QA pass and production deployment.

', creator: 'user-3', owner: 'user-4' }, +]; + +const ATTACHMENTS: ExternalAttachment[] = [ + { id: 'att-1', url: 'https://www.devrev.ai/favicon.ico', file_name: 'devrev-icon.ico', author_id: 'user-1', parent_id: 'todo-1' }, + { id: 'att-2', url: 'https://www.devrev.ai/favicon.ico', file_name: 'devrev-icon.ico', author_id: 'user-2', parent_id: 'todo-5' }, +]; + +// --------------------------------------------------------------------------- +// In-memory mutable maps for create/update operations (loading direction) +// --------------------------------------------------------------------------- +const todosStore = new Map(TODOS.map((t) => [t.id, { ...t }])); + +// --------------------------------------------------------------------------- +// Pagination helper +// +// In a real connector, pagination would be driven by cursor or offset/limit +// parameters on the external API. Here we simulate it with a fixed page size +// so the pattern is visible even though the data fits in a single page. +// --------------------------------------------------------------------------- +const PAGE_SIZE = 5; + +function paginateAndFilter( + items: T[], + modifiedAfter: string, + modifiedBefore: string +): T[] { + // CPv2: Both timestamps are always valid ISO 8601 strings provided by the SDK. + // extract_from = UNIX epoch (1970-01-01T00:00:00.000Z) for a full sync, or the + // last sync boundary for an ongoing sync. extract_to = current time. + // The connector never needs to branch on mode — it always uses the window. + const afterMs = new Date(modifiedAfter).getTime(); + const beforeMs = new Date(modifiedBefore).getTime(); + + const filtered = items.filter((item) => { + const modifiedMs = new Date(item.modified_date).getTime(); + return modifiedMs >= afterMs && modifiedMs <= beforeMs; + }); + + // Simulate paginated retrieval. In a real connector you would loop here, + // calling the API with limit/offset (or a cursor) until hasMore is false. + const result: T[] = []; + for (let offset = 0; offset < filtered.length; offset += PAGE_SIZE) { + const page = filtered.slice(offset, offset + PAGE_SIZE); + result.push(...page); + // In production: if (page.length < PAGE_SIZE) break; // no more pages + } + + return result; +} + +// --------------------------------------------------------------------------- +// HttpClient +// --------------------------------------------------------------------------- + export class HttpClient { private apiEndpoint: string; private apiToken: string; constructor(event: AirdropEvent) { - // TODO: Replace with API endpoint of the external system. This is passed through - // the event payload. + // TODO: Replace with the API endpoint of the external system. This is + // passed through the event payload (e.g. event.payload.connection_data.org_id + // or a keyring subdomain field). this.apiEndpoint = ''; - // TODO: Replace with API token of the external system. This is passed - // through the event payload. Configuration for the token is defined in manifest.yaml. + // TODO: Replace with the API token of the external system. This is passed + // through the event payload. The configuration for the token is defined in + // manifest.yaml under keyring_types. this.apiToken = event.payload.connection_data.key; } - // TODO: Replace with the actual function to fetch external sync units from - // the external system. + // TODO: Replace with actual API calls that fetch external sync units (e.g. + // repos, projects, boards) from the external system. async getTodoLists(): Promise { - return new Promise((resolve) => { - resolve([ - { - id: '1', - name: 'Todo List', - description: 'This is a todo list', - item_count: 2, - item_type: 'todos', - }, - ]); - }); + return TODO_LISTS; } - // TODO: Replace with the actual function to fetch list of items from the - // external system. - async getTodos(): Promise { - return new Promise((resolve) => { - resolve([ - { - id: 'todo-1', - created_date: '1999-12-25T01:00:03+01:00', - modified_date: '1999-12-25T01:00:03+01:00', - body: '

This is Todo 1

', - creator: 'user-1', - owner: 'user-1', - title: 'Todo 1', - }, - { - id: 'todo-2', - created_date: '1999-12-27T15:31:34+01:00', - modified_date: '2002-04-09T01:55:31+02:00', - body: '

This is Todo 2

', - creator: 'user-2', - owner: 'user-2', - title: 'Todo 2', - }, - ]); - }); + // TODO: Replace with actual API calls that fetch todos from the external system. + // + // Both `modifiedAfter` and `modifiedBefore` are always valid ISO 8601 strings + // resolved by the SDK from CPv2 extraction_start_time / extraction_end_time. + // For a full initial sync, modifiedAfter is the UNIX epoch + // ("1970-01-01T00:00:00.000Z") and modifiedBefore is the current time, so + // every item passes the filter. For an ongoing sync the platform sets a + // narrower window covering only recently changed data. + async getTodos(modifiedAfter: string, modifiedBefore: string): Promise { + return paginateAndFilter([...todosStore.values()], modifiedAfter, modifiedBefore); } - // TODO: Replace with the actual function to fetch list of users from the - // external system. - async getUsers(): Promise { - return new Promise((resolve) => { - resolve([ - { - id: 'user-1', - created_date: '1999-12-25T01:00:03+01:00', - modified_date: '1999-12-25T01:00:03+01:00', - email: 'johndoe@test.com', - name: 'John Doe', - }, - { - id: 'user-2', - created_date: '1999-12-27T15:31:34+01:00', - modified_date: '2002-04-09T01:55:31+02:00', - email: 'janedoe@test.com', - name: 'Jane Doe', - }, - ]); - }); + // TODO: Replace with actual API calls that fetch users from the external system. + async getUsers(modifiedAfter: string, modifiedBefore: string): Promise { + return paginateAndFilter(USERS, modifiedAfter, modifiedBefore); } - // TODO: Replace with the actual function to fetch list of attachments from - // the external system. - async getAttachments(): Promise { - return new Promise((resolve) => { - resolve([ - { - url: 'https://app.devrev.ai/favicon.ico', - id: 'attachment-1', - file_name: 'favicon1.ico', - author_id: 'user-1', - parent_id: 'todo-1', - }, - { - url: 'https://app.devrev.ai/favicon.ico', - id: 'attachment-2', - file_name: 'favicon2.ico', - author_id: 'user-2', - parent_id: 'todo-2', - }, - ]); + // TODO: Replace with actual API calls that fetch attachments from the + // external system. + async getAttachments(modifiedAfter: string, modifiedBefore: string): Promise { + // Attachments don't carry a modified_date in this mock, so we derive one + // from their parent todo's modified_date to keep the same filtering logic. + const attachmentsWithDates = ATTACHMENTS.map((att) => { + const parent = todosStore.get(att.parent_id); + return { + ...att, + modified_date: parent?.modified_date ?? '1970-01-01T00:00:00.000Z', + }; }); + + return paginateAndFilter(attachmentsWithDates, modifiedAfter, modifiedBefore); } - // TODO: Replace with the actual function to create an item in the external system. - // eslint-disable-next-line @typescript-eslint/no-unused-vars + // TODO: Replace with an actual API call that creates a todo in the external system. + // The function must return { id } on success or { error } on failure. async createTodo(todo: ExternalTodo): Promise { - return { error: 'Could not create todo in external system.' }; + const id = `todo-${Date.now()}`; + todosStore.set(id, { ...todo, id, modified_date: new Date().toISOString() }); + return { id }; } - // TODO: Replace with the actual function to update an item in the external system. - // eslint-disable-next-line @typescript-eslint/no-unused-vars + // TODO: Replace with an actual API call that updates a todo in the external system. + // The function must return { id } on success or { error } on failure. async updateTodo(todo: ExternalTodo): Promise { - return { error: 'Could not update todo in external system.' }; + if (!todosStore.has(todo.id)) { + return { error: `Todo with id "${todo.id}" not found in external system.` }; + } + todosStore.set(todo.id, { ...todo, modified_date: new Date().toISOString() }); + return { id: todo.id }; } - // TODO: Replace with the actual function to create an attachment in the external system. - // eslint-disable-next-line @typescript-eslint/no-unused-vars + // TODO: Replace with an actual API call that creates an attachment in the external system. + // The function must return { id } on success or { error } on failure. async createAttachment(attachment: ExternalAttachment): Promise { - return { error: 'Could not create attachment in external system.' }; + return { id: attachment.id }; } } diff --git a/code/src/functions/extraction/workers/data-extraction.ts b/code/src/functions/extraction/workers/data-extraction.ts index 5a0f731..d8298a4 100644 --- a/code/src/functions/extraction/workers/data-extraction.ts +++ b/code/src/functions/extraction/workers/data-extraction.ts @@ -6,8 +6,7 @@ import { ExtractorState } from '../index'; import { ExternalTodo, ExternalUser, ExternalAttachment } from '../../external-system/types'; // TODO: Replace with actual repos that will be used to store the -// data extracted from the external system. For example, you might want to -// create repos for todos, users, and attachments. Also replace and modify +// data extracted from the external system. Also replace and modify // the normalization functions which are used to normalize the data. const repos = [ { @@ -24,27 +23,42 @@ const repos = [ }, ]; -// TODO: Replace with item types you want to extract from the external system. -// Also replace the extract functions with the actual functions that will be -// used to extract the data. You can use this to easier iterate over the item -// types and extract them. +// TODO: Replace with the item types you want to extract from the external system. +// The extractFunction receives the time window (extract_from, extract_to) resolved +// by the SDK from the CPv2 extraction_start_time / extraction_end_time fields. interface ItemTypeToExtract { name: 'todos' | 'users' | 'attachments'; - extractFunction: (client: HttpClient) => Promise; + extractFunction: (client: HttpClient, modifiedAfter: string, modifiedBefore: string) => Promise; + /** + * When true, the CPv2 time window is ignored and all items are always + * extracted. Use this for reference/identity data (e.g. users) that other + * item types depend on — extracting only a recent window would leave + * references to older records unresolvable. + */ + alwaysFullExtract?: boolean; } +// Sentinel timestamps used when alwaysFullExtract is true. +const EPOCH = '1970-01-01T00:00:00.000Z'; +const FAR_FUTURE = '9999-12-31T23:59:59.999Z'; + const itemTypesToExtract: ItemTypeToExtract[] = [ { name: 'todos', - extractFunction: (client: HttpClient) => client.getTodos(), + extractFunction: (client, modifiedAfter, modifiedBefore) => client.getTodos(modifiedAfter, modifiedBefore), }, { + // Users are always fully extracted regardless of the CPv2 time window. + // They are identity/reference data: todos reference users via creator and + // owner fields. If only recently-modified users were extracted, older users + // would be missing and those references could not be resolved. name: 'users', - extractFunction: (client: HttpClient) => client.getUsers(), + extractFunction: (client, modifiedAfter, modifiedBefore) => client.getUsers(modifiedAfter, modifiedBefore), + alwaysFullExtract: true, }, { name: 'attachments', - extractFunction: (client: HttpClient) => client.getAttachments(), + extractFunction: (client, modifiedAfter, modifiedBefore) => client.getAttachments(modifiedAfter, modifiedBefore), }, ]; @@ -52,19 +66,51 @@ processTask({ task: async ({ adapter }) => { adapter.initializeRepos(repos); - // TODO: Replace with HTTP client that will be used to make API calls + // TODO: Replace with the HTTP client that will be used to make API calls // to the external system. const httpClient = new HttpClient(adapter.event); + // CPv2: The SDK always resolves extract_from and extract_to from the + // platform's extraction_start_time / extraction_end_time to concrete ISO + // 8601 timestamps before calling the connector. + // + // For a full initial sync extract_from is the UNIX epoch + // ("1970-01-01T00:00:00.000Z") and extract_to is the current time, so + // every item passes the filter. For an ongoing / time-scoped sync the + // platform sets a narrower window covering only data that changed inside + // that range. + // + // The connector never needs to inspect or branch on the sync mode — it + // always uses the provided window to filter data from the external system. + const { extract_from, extract_to } = adapter.event.payload.event_context; + // TODO: Replace with your implementation to extract data from the external - // system. This is just an example how you can iterate over the item types, - // extract them, push them to the repo, and save the state. + // system. This example iterates over item types, respects the extraction + // scope (selective extraction) and the CPv2 time window, pushes items to + // the repo, and saves progress to state. for (const itemTypeToExtract of itemTypesToExtract) { - // If the worker is about to time out, exit early so that `onTimeout` can run and emit progress. - if(adapter.isTimeout) { + // If the worker is about to time out, exit early so that `onTimeout` + // can run and emit progress back to the platform. + if (adapter.isTimeout) { return; } - const items = await itemTypeToExtract.extractFunction(httpClient); + + // Selective extraction: skip item types that are not in scope according + // to the user's sync recipe configuration. The platform communicates + // this through the extraction scope attached to the adapter state. + // Defaults to true if the item type is not listed in the scope. + if (!adapter.shouldExtract(itemTypeToExtract.name)) { + adapter.state[itemTypeToExtract.name].completed = true; + continue; + } + + const items = await itemTypeToExtract.extractFunction( + httpClient, + // Both timestamps are always present ISO strings — no null check needed. + // Items with alwaysFullExtract bypass the CPv2 window entirely. + itemTypeToExtract.alwaysFullExtract ? EPOCH : extract_from as string, + itemTypeToExtract.alwaysFullExtract ? FAR_FUTURE : extract_to as string + ); await adapter.getRepo(itemTypeToExtract.name)?.push(items); adapter.state[itemTypeToExtract.name].completed = true; } diff --git a/code/src/functions/loading/workers/load-attachments.ts b/code/src/functions/loading/workers/load-attachments.ts index fb440d6..b8e988e 100644 --- a/code/src/functions/loading/workers/load-attachments.ts +++ b/code/src/functions/loading/workers/load-attachments.ts @@ -5,15 +5,15 @@ import { processTask, } from '@devrev/ts-adaas'; +import { denormalizeAttachment } from '../../external-system/data-denormalization'; import { HttpClient } from '../../external-system/http-client'; import { LoaderState } from '../index'; -import { ExternalAttachment } from 'functions/external-system/types'; /* eslint-disable @typescript-eslint/no-unused-vars */ // TODO: Replace with your create function that will be used to make API calls // to the external system to create a new attachment. Function must return -// object with http stream or error depending on the response from the external system. +// object with id or error depending on the response from the external system. async function createAttachment({ item, mappers, event }: ExternalSystemItemLoadingParams) { // TODO: Replace with your HTTP client that will be used to make API calls // to the external system. @@ -41,8 +41,3 @@ processTask({ }); }, }); - -function denormalizeAttachment(item: ExternalSystemAttachment): ExternalAttachment { - throw new Error('Function not implemented.'); -} - diff --git a/code/src/main.ts b/code/src/main.ts index 55e969a..98c5f13 100644 --- a/code/src/main.ts +++ b/code/src/main.ts @@ -11,16 +11,16 @@ import { testRunner } from './test-runner/test-runner'; }, functionName: { type: 'string', - require: true, + require: false, }, }).argv; - if (!argv.fixturePath || !argv.functionName) { - console.error('Please make sure you have passed fixturePath & functionName'); + if (!argv.fixturePath) { + console.error('Please make sure you have fixturePath in your command'); } await testRunner({ fixturePath: argv.fixturePath, - functionName: argv.functionName as FunctionFactoryType, + functionName: argv.functionName as FunctionFactoryType | undefined, }); })(); diff --git a/code/src/test-runner/test-runner.ts b/code/src/test-runner/test-runner.ts index 5429875..025bfd6 100644 --- a/code/src/test-runner/test-runner.ts +++ b/code/src/test-runner/test-runner.ts @@ -1,45 +1,147 @@ -import { AirdropEvent } from '@devrev/ts-adaas'; import * as dotenv from 'dotenv'; +import * as fs from 'fs'; +import * as path from 'path'; + +import { AirdropEvent, createMockEvent, DeepPartial, EventType, MockServer } from '@devrev/ts-adaas'; + import { functionFactory, FunctionFactoryType } from '../function-factory'; -export interface TestRunnerProps { - functionName: FunctionFactoryType; +export interface LocalRunnerProps { fixturePath: string; + functionName?: FunctionFactoryType; } -export function addCredentials(events: AirdropEvent[], env: dotenv.DotenvParseOutput): AirdropEvent[] { - return events.map((event: AirdropEvent) => { - return { - ...event, - context: { - ...event.context, - secrets: { - ...event.context.secrets, - service_account_token: env['DEVREV_PAT'], - }, - }, - }; + +/** + * Replaces `${VAR_NAME}` placeholders with values from `process.env`. + * Values are JSON-escaped so special characters don't break the JSON structure. + */ +function resolveEnvVariables(raw: string, filePath: string): string { + return raw.replace(/\$\{(\w+)\}/g, (_match, varName: string) => { + const value = process.env[varName]; + if (value === undefined) { + throw new Error( + `Environment variable "${varName}" referenced in ${filePath} is not set. ` + + 'Make sure it is defined in your .env file or exported in your shell.' + ); + } + return JSON.stringify(value).slice(1, -1); }); } -export const testRunner = async ({ functionName, fixturePath }: TestRunnerProps) => { - const env = dotenv.config(); +function readFixtureFile(filePath: string): T | undefined { + if (!fs.existsSync(filePath)) { + return undefined; + } + const raw = fs.readFileSync(filePath, 'utf-8').trim(); + if (raw.length === 0) { + return undefined; + } + const resolved = resolveEnvVariables(raw, filePath); + return JSON.parse(resolved) as T; +} + +function resolveEventType(input: string): EventType { + const match = Object.values(EventType).find((v) => v === input); + if (match) return match as EventType; + + throw new Error(`Unknown event_type "${input}". Must be one of: ${Object.values(EventType).join(', ')}`); +} + +export const testRunner = async ({ fixturePath, functionName }: LocalRunnerProps) => { + dotenv.config(); - console.log('env:', env); + const fixturesDir = path.resolve(__dirname, '../../fixtures', fixturePath); + if (!fs.existsSync(fixturesDir)) { + throw new Error(`Fixture directory not found: ${fixturesDir}`); + } + return runWithFixtureDir(fixturesDir, functionName); +}; - if (!functionFactory[functionName]) { - console.error(`${functionName} is not found in the functionFactory`); - console.error('Add your function to the function-factory.ts file'); - throw new Error('Function is not found in the functionFactory'); +function getFunctionName(event_type: string): FunctionFactoryType { + if (event_type.indexOf('EXTRACT') != -1) { + return 'extraction'; + } else if (event_type.indexOf('LOAD') != -1) { + return 'loading'; } - const run = functionFactory[functionName]; + throw new Error(`No functionName found for event ${event_type}. Specify functionName using '--functionName' parameter.`); +} + +async function runWithFixtureDir(fixturesDir: string, functionName?: FunctionFactoryType) { + const eventPath = path.join(fixturesDir, 'event.json'); + const statePath = path.join(fixturesDir, 'state.json'); + const extractionScopePath = path.join(fixturesDir, 'extraction_scope.json'); - // eslint-disable-next-line @typescript-eslint/no-require-imports - const eventFixture = require(`../fixtures/${fixturePath}`); + const fixtureEvent = readFixtureFile>(eventPath); + const fixtureState = readFixtureFile>(statePath); + const fixtureExtractionScope = readFixtureFile>(extractionScopePath); + + if (!fixtureEvent) { + throw new Error( + `Missing or empty event.json in fixture directory: ${eventPath}. ` + + 'Every fixture must have an event.json with at least an "event_type" field.' + ); + } + + if (!fixtureEvent.payload?.event_type) { + throw new Error( + `event.json at ${eventPath} is missing the required "event_type" field. ` + + 'Specify an event type such as "START_EXTRACTING_DATA" or "START_EXTRACTING_EXTERNAL_SYNC_UNITS".' + ); + } - if (env.parsed) { - await run(addCredentials(eventFixture, env.parsed)); + const resolvedFunctionName = functionName ?? getFunctionName(fixtureEvent.payload?.event_type); + + if (!resolvedFunctionName) { + throw new Error( + 'No function name provided. Either pass --functionName on the CLI ' + 'or set "function_name" in event.json.' + ); + } + + if (!(resolvedFunctionName in functionFactory)) { + throw new Error( + `Function "${resolvedFunctionName}" not found in functionFactory. ` + + `Available: ${Object.keys(functionFactory).join(', ')}` + ); + } + + const eventType = resolveEventType(fixtureEvent.payload?.event_type); + + console.log(`[test-runner] Function : ${resolvedFunctionName}`); + console.log(`[test-runner] Event : ${eventType}`); + console.log(`[test-runner] Fixture : ${fixturesDir}`); + + const mockServer = new MockServer(0); + await mockServer.start(); + + console.log(`[test-runner] MockServer started on ${mockServer.baseUrl}`); + + mockServer.setRoute({ + path: '/worker_data_url.get', + method: 'GET', + status: 200, + body: { state: JSON.stringify(fixtureState ?? {}), objects: JSON.stringify(fixtureExtractionScope ?? {}) }, + }); + if (fixtureState) { + console.log(`[test-runner] Injected state from state.json`); } else { - await run(eventFixture); + console.log(`[test-runner] No state.json found — using default empty state`); } -}; + + const event = createMockEvent(mockServer.baseUrl, fixtureEvent); + + process.argv.push("--local"); + + const run = functionFactory[resolvedFunctionName]; + + try { + await run([event]); + console.log(`[test-runner] Function completed successfully`); + } catch (err) { + console.error(`[test-runner] Function threw an error:`, err); + process.exitCode = 1; + } finally { + await mockServer.stop(); + console.log(`[test-runner] MockServer stopped`); + } +} diff --git a/manifest.yaml b/manifest.yaml index 2be4231..fd0590c 100644 --- a/manifest.yaml +++ b/manifest.yaml @@ -88,8 +88,7 @@ imports: # and with your code, if you changed the names. extractor_function: extraction - # TODO: Uncomment the loader function once you have implemented loading. - #loader_function: loading + loader_function: loading # The list of connection types that are available for the external system. # Make sure these are in sync with the keyring types in the 'keyring_types' section above. From 580cb6d6a044f2a730104d88989457d852001d4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gas=CC=8Cper=20Zgonec?= Date: Thu, 23 Apr 2026 10:13:47 +0200 Subject: [PATCH 2/2] Removed CPv2 simulation. --- README.md | 15 +++++ .../fixtures/start_extracting_data/event.json | 4 +- .../extraction_scope.json | 11 ---- .../event.json | 6 +- .../event.json | 16 ----- .../extraction_scope.json | 5 -- .../state.json | 5 -- .../event.json | 2 +- code/fixtures/start_loading_data/event.json | 2 +- .../functions/external-system/http-client.ts | 59 ++++--------------- .../extraction/workers/data-extraction.ts | 54 ++++------------- code/src/test-runner/test-runner.ts | 14 ++++- manifest.yaml | 5 ++ 13 files changed, 57 insertions(+), 141 deletions(-) delete mode 100644 code/fixtures/start_extracting_data/extraction_scope.json delete mode 100644 code/fixtures/start_extracting_data_with_timestamps/event.json delete mode 100644 code/fixtures/start_extracting_data_with_timestamps/extraction_scope.json delete mode 100644 code/fixtures/start_extracting_data_with_timestamps/state.json diff --git a/README.md b/README.md index e713cb8..8ecb553 100644 --- a/README.md +++ b/README.md @@ -17,3 +17,18 @@ the name, according to Step 2 above. While developing the AirSync snap-in, make sure to refer to the [AirSync snap-in documentation](https://developer.devrev.ai/airsync). + +## Local Fixture Testing + +From the `code/` directory, you can run the built-in fixture runner to test the +template locally: + +```bash +npm start -- --fixturePath start_extracting_external_sync_units +npm start -- --fixturePath start_extracting_data +npm start -- --fixturePath start_extracting_data_selective +npm start -- --fixturePath start_loading_data +``` + +The runner reads `code/.env` and resolves `${TODO_API_KEY}` in fixture files, +so copy `code/.env.example` before running the fixtures. diff --git a/code/fixtures/start_extracting_data/event.json b/code/fixtures/start_extracting_data/event.json index d1cb3aa..d076c25 100644 --- a/code/fixtures/start_extracting_data/event.json +++ b/code/fixtures/start_extracting_data/event.json @@ -8,9 +8,7 @@ }, "event_type": "START_EXTRACTING_DATA", "event_context": { - "external_sync_unit_id": "test_external_sync_unit_id", - "extract_from": "1970-01-01T00:00:00.000Z", - "extract_to": "2099-12-31T23:59:59.999Z" + "external_sync_unit_id": "test_external_sync_unit_id" } } } diff --git a/code/fixtures/start_extracting_data/extraction_scope.json b/code/fixtures/start_extracting_data/extraction_scope.json deleted file mode 100644 index d679c3b..0000000 --- a/code/fixtures/start_extracting_data/extraction_scope.json +++ /dev/null @@ -1,11 +0,0 @@ -{ - "todos": { - "extract": true - }, - "users": { - "extract": true - }, - "attachments": { - "extract": true - } -} diff --git a/code/fixtures/start_extracting_data_selective/event.json b/code/fixtures/start_extracting_data_selective/event.json index 5fac93a..d076c25 100644 --- a/code/fixtures/start_extracting_data_selective/event.json +++ b/code/fixtures/start_extracting_data_selective/event.json @@ -3,14 +3,12 @@ "connection_data": { "org_id": "test_org", "org_name": "Test Organization", - "key": "test-api-key", + "key": "${TODO_API_KEY}", "key_type": "pat" }, "event_type": "START_EXTRACTING_DATA", "event_context": { - "external_sync_unit_id": "test_external_sync_unit_id", - "extract_from": "1970-01-01T00:00:00.000Z", - "extract_to": "2099-12-31T23:59:59.999Z" + "external_sync_unit_id": "test_external_sync_unit_id" } } } diff --git a/code/fixtures/start_extracting_data_with_timestamps/event.json b/code/fixtures/start_extracting_data_with_timestamps/event.json deleted file mode 100644 index 9d13ae2..0000000 --- a/code/fixtures/start_extracting_data_with_timestamps/event.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "payload": { - "connection_data": { - "org_id": "test_org", - "org_name": "Test Organization", - "key": "test-api-key", - "key_type": "pat" - }, - "event_type": "START_EXTRACTING_DATA", - "event_context": { - "external_sync_unit_id": "test_external_sync_unit_id", - "extract_from": "2024-01-01T00:00:00.000Z", - "extract_to": "2026-04-14T00:00:00.000Z" - } - } -} diff --git a/code/fixtures/start_extracting_data_with_timestamps/extraction_scope.json b/code/fixtures/start_extracting_data_with_timestamps/extraction_scope.json deleted file mode 100644 index 3562132..0000000 --- a/code/fixtures/start_extracting_data_with_timestamps/extraction_scope.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "todos": { "extract": true }, - "users": { "extract": true }, - "attachments": { "extract": true } -} diff --git a/code/fixtures/start_extracting_data_with_timestamps/state.json b/code/fixtures/start_extracting_data_with_timestamps/state.json deleted file mode 100644 index 3d5b4c0..0000000 --- a/code/fixtures/start_extracting_data_with_timestamps/state.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "todos": { "completed": false }, - "users": { "completed": false }, - "attachments": { "completed": false } -} diff --git a/code/fixtures/start_extracting_external_sync_units/event.json b/code/fixtures/start_extracting_external_sync_units/event.json index 7a9e79e..2c80b01 100644 --- a/code/fixtures/start_extracting_external_sync_units/event.json +++ b/code/fixtures/start_extracting_external_sync_units/event.json @@ -6,7 +6,7 @@ "connection_data": { "org_id": "test_org", "org_name": "Test Organization", - "key": "test-api-key", + "key": "${TODO_API_KEY}", "key_type": "pat" } } diff --git a/code/fixtures/start_loading_data/event.json b/code/fixtures/start_loading_data/event.json index 0b23b9f..2397ca5 100644 --- a/code/fixtures/start_loading_data/event.json +++ b/code/fixtures/start_loading_data/event.json @@ -3,7 +3,7 @@ "connection_data": { "org_id": "test_org", "org_name": "Test Organization", - "key": "test-api-key", + "key": "${TODO_API_KEY}", "key_type": "pat" }, "event_type": "START_LOADING_DATA", diff --git a/code/src/functions/external-system/http-client.ts b/code/src/functions/external-system/http-client.ts index 9fb20ba..c729d68 100644 --- a/code/src/functions/external-system/http-client.ts +++ b/code/src/functions/external-system/http-client.ts @@ -6,12 +6,6 @@ import { ExternalAttachment, ExternalTodo, ExternalTodoList, ExternalUser } from // // This simulates an external system's database. In a real connector you would // replace the methods below with actual HTTP calls to your external API. -// -// The items span a wide range of dates so that the time-window filtering -// behaviour (driven by CPv2's extract_from / extract_to) is clearly visible: -// - Some items are old (pre-2020) and will only appear in a full sync. -// - Some items are recent (2024-2026) and will appear in both full and -// narrow-window syncs. // --------------------------------------------------------------------------- const USERS: ExternalUser[] = [ @@ -70,28 +64,12 @@ const todosStore = new Map(TODOS.map((t) => [t.id, { ...t // --------------------------------------------------------------------------- const PAGE_SIZE = 5; -function paginateAndFilter( - items: T[], - modifiedAfter: string, - modifiedBefore: string -): T[] { - // CPv2: Both timestamps are always valid ISO 8601 strings provided by the SDK. - // extract_from = UNIX epoch (1970-01-01T00:00:00.000Z) for a full sync, or the - // last sync boundary for an ongoing sync. extract_to = current time. - // The connector never needs to branch on mode — it always uses the window. - const afterMs = new Date(modifiedAfter).getTime(); - const beforeMs = new Date(modifiedBefore).getTime(); - - const filtered = items.filter((item) => { - const modifiedMs = new Date(item.modified_date).getTime(); - return modifiedMs >= afterMs && modifiedMs <= beforeMs; - }); - +function paginate(items: T[]): T[] { // Simulate paginated retrieval. In a real connector you would loop here, // calling the API with limit/offset (or a cursor) until hasMore is false. const result: T[] = []; - for (let offset = 0; offset < filtered.length; offset += PAGE_SIZE) { - const page = filtered.slice(offset, offset + PAGE_SIZE); + for (let offset = 0; offset < items.length; offset += PAGE_SIZE) { + const page = items.slice(offset, offset + PAGE_SIZE); result.push(...page); // In production: if (page.length < PAGE_SIZE) break; // no more pages } @@ -122,40 +100,23 @@ export class HttpClient { // TODO: Replace with actual API calls that fetch external sync units (e.g. // repos, projects, boards) from the external system. async getTodoLists(): Promise { - return TODO_LISTS; + return paginate(TODO_LISTS); } // TODO: Replace with actual API calls that fetch todos from the external system. - // - // Both `modifiedAfter` and `modifiedBefore` are always valid ISO 8601 strings - // resolved by the SDK from CPv2 extraction_start_time / extraction_end_time. - // For a full initial sync, modifiedAfter is the UNIX epoch - // ("1970-01-01T00:00:00.000Z") and modifiedBefore is the current time, so - // every item passes the filter. For an ongoing sync the platform sets a - // narrower window covering only recently changed data. - async getTodos(modifiedAfter: string, modifiedBefore: string): Promise { - return paginateAndFilter([...todosStore.values()], modifiedAfter, modifiedBefore); + async getTodos(): Promise { + return paginate([...todosStore.values()]); } // TODO: Replace with actual API calls that fetch users from the external system. - async getUsers(modifiedAfter: string, modifiedBefore: string): Promise { - return paginateAndFilter(USERS, modifiedAfter, modifiedBefore); + async getUsers(): Promise { + return paginate(USERS); } // TODO: Replace with actual API calls that fetch attachments from the // external system. - async getAttachments(modifiedAfter: string, modifiedBefore: string): Promise { - // Attachments don't carry a modified_date in this mock, so we derive one - // from their parent todo's modified_date to keep the same filtering logic. - const attachmentsWithDates = ATTACHMENTS.map((att) => { - const parent = todosStore.get(att.parent_id); - return { - ...att, - modified_date: parent?.modified_date ?? '1970-01-01T00:00:00.000Z', - }; - }); - - return paginateAndFilter(attachmentsWithDates, modifiedAfter, modifiedBefore); + async getAttachments(): Promise { + return paginate(ATTACHMENTS); } // TODO: Replace with an actual API call that creates a todo in the external system. diff --git a/code/src/functions/extraction/workers/data-extraction.ts b/code/src/functions/extraction/workers/data-extraction.ts index d8298a4..a4c9c02 100644 --- a/code/src/functions/extraction/workers/data-extraction.ts +++ b/code/src/functions/extraction/workers/data-extraction.ts @@ -24,41 +24,23 @@ const repos = [ ]; // TODO: Replace with the item types you want to extract from the external system. -// The extractFunction receives the time window (extract_from, extract_to) resolved -// by the SDK from the CPv2 extraction_start_time / extraction_end_time fields. interface ItemTypeToExtract { name: 'todos' | 'users' | 'attachments'; - extractFunction: (client: HttpClient, modifiedAfter: string, modifiedBefore: string) => Promise; - /** - * When true, the CPv2 time window is ignored and all items are always - * extracted. Use this for reference/identity data (e.g. users) that other - * item types depend on — extracting only a recent window would leave - * references to older records unresolvable. - */ - alwaysFullExtract?: boolean; + extractFunction: (client: HttpClient) => Promise; } -// Sentinel timestamps used when alwaysFullExtract is true. -const EPOCH = '1970-01-01T00:00:00.000Z'; -const FAR_FUTURE = '9999-12-31T23:59:59.999Z'; - const itemTypesToExtract: ItemTypeToExtract[] = [ { name: 'todos', - extractFunction: (client, modifiedAfter, modifiedBefore) => client.getTodos(modifiedAfter, modifiedBefore), + extractFunction: (client) => client.getTodos(), }, { - // Users are always fully extracted regardless of the CPv2 time window. - // They are identity/reference data: todos reference users via creator and - // owner fields. If only recently-modified users were extracted, older users - // would be missing and those references could not be resolved. name: 'users', - extractFunction: (client, modifiedAfter, modifiedBefore) => client.getUsers(modifiedAfter, modifiedBefore), - alwaysFullExtract: true, + extractFunction: (client) => client.getUsers(), }, { name: 'attachments', - extractFunction: (client, modifiedAfter, modifiedBefore) => client.getAttachments(modifiedAfter, modifiedBefore), + extractFunction: (client) => client.getAttachments(), }, ]; @@ -70,24 +52,14 @@ processTask({ // to the external system. const httpClient = new HttpClient(adapter.event); - // CPv2: The SDK always resolves extract_from and extract_to from the - // platform's extraction_start_time / extraction_end_time to concrete ISO - // 8601 timestamps before calling the connector. - // - // For a full initial sync extract_from is the UNIX epoch - // ("1970-01-01T00:00:00.000Z") and extract_to is the current time, so - // every item passes the filter. For an ongoing / time-scoped sync the - // platform sets a narrower window covering only data that changed inside - // that range. - // - // The connector never needs to inspect or branch on the sync mode — it - // always uses the provided window to filter data from the external system. - const { extract_from, extract_to } = adapter.event.payload.event_context; + // CPv2 note: if you enable TIME_SCOPED_SYNCS in manifest.yaml, read + // adapter.event.payload.event_context.extract_from / extract_to here and + // pass them to the external system query layer. // TODO: Replace with your implementation to extract data from the external // system. This example iterates over item types, respects the extraction - // scope (selective extraction) and the CPv2 time window, pushes items to - // the repo, and saves progress to state. + // scope (selective extraction), pushes items to the repo, and saves + // progress to state. for (const itemTypeToExtract of itemTypesToExtract) { // If the worker is about to time out, exit early so that `onTimeout` // can run and emit progress back to the platform. @@ -104,13 +76,7 @@ processTask({ continue; } - const items = await itemTypeToExtract.extractFunction( - httpClient, - // Both timestamps are always present ISO strings — no null check needed. - // Items with alwaysFullExtract bypass the CPv2 window entirely. - itemTypeToExtract.alwaysFullExtract ? EPOCH : extract_from as string, - itemTypeToExtract.alwaysFullExtract ? FAR_FUTURE : extract_to as string - ); + const items = await itemTypeToExtract.extractFunction(httpClient); await adapter.getRepo(itemTypeToExtract.name)?.push(items); adapter.state[itemTypeToExtract.name].completed = true; } diff --git a/code/src/test-runner/test-runner.ts b/code/src/test-runner/test-runner.ts index 025bfd6..f80cbe6 100644 --- a/code/src/test-runner/test-runner.ts +++ b/code/src/test-runner/test-runner.ts @@ -120,17 +120,27 @@ async function runWithFixtureDir(fixturesDir: string, functionName?: FunctionFac path: '/worker_data_url.get', method: 'GET', status: 200, - body: { state: JSON.stringify(fixtureState ?? {}), objects: JSON.stringify(fixtureExtractionScope ?? {}) }, + body: { + state: JSON.stringify(fixtureState ?? {}), + objects: JSON.stringify(fixtureExtractionScope ?? {}), + }, }); + if (fixtureState) { console.log(`[test-runner] Injected state from state.json`); } else { console.log(`[test-runner] No state.json found — using default empty state`); } + if (fixtureExtractionScope) { + console.log(`[test-runner] Injected extraction scope from extraction_scope.json`); + } else { + console.log(`[test-runner] No extraction_scope.json found — using default all-extract scope`); + } + const event = createMockEvent(mockServer.baseUrl, fixtureEvent); - process.argv.push("--local"); + process.argv.push('--local'); const run = functionFactory[resolvedFunctionName]; diff --git a/manifest.yaml b/manifest.yaml index fd0590c..4fa2f8c 100644 --- a/manifest.yaml +++ b/manifest.yaml @@ -98,4 +98,9 @@ imports: # The list of capabilities that the snap-in supports. # capabilities: # TODO: Uncomment if your snap-in supports time-scoped syncs. + # Use this only when your external API can honor a time window. + # The platform resolves `extraction_start_time` / `extraction_end_time` into + # `adapter.event.payload.event_context.extract_from` / `extract_to`; pass + # those values through to your external query layer instead of filtering + # mock data in the connector itself. # - TIME_SCOPED_SYNCS