From 379b5741ce28ca2ec351f0ec6da93fc923f39dea Mon Sep 17 00:00:00 2001 From: Jorge Cortes Date: Mon, 1 Sep 2025 14:02:22 -0500 Subject: [PATCH] [FIX] Airtable webhook sources intermittently emit the raw Airtable payload --- components/airtable_oauth/package.json | 4 +- .../sources/common/common-webhook-field.mjs | 5 -- .../sources/common/common-webhook-record.mjs | 32 +++++--- .../sources/common/common-webhook.mjs | 75 +++++++++++++------ .../sources/new-field/new-field.mjs | 5 +- ...ew-modified-or-deleted-records-instant.mjs | 8 +- .../new-or-modified-field.mjs | 5 +- .../new-or-modified-records.mjs | 8 +- .../sources/new-records/new-records.mjs | 5 +- pnpm-lock.yaml | 6 ++ 10 files changed, 104 insertions(+), 49 deletions(-) diff --git a/components/airtable_oauth/package.json b/components/airtable_oauth/package.json index f03f7a931b12d..4d7e3f891f769 100644 --- a/components/airtable_oauth/package.json +++ b/components/airtable_oauth/package.json @@ -1,6 +1,6 @@ { "name": "@pipedream/airtable_oauth", - "version": "0.5.1", + "version": "0.5.2", "description": "Pipedream Airtable (OAuth) Components", "main": "airtable_oauth.app.mjs", "keywords": [ @@ -15,7 +15,9 @@ "dependencies": { "@pipedream/platform": "^3.0.3", "airtable": "^0.11.1", + "async-retry": "^1.3.3", "bottleneck": "^2.19.5", + "crypto": "^1.0.1", "lodash.chunk": "^4.2.0", "lodash.isempty": "^4.4.0", "moment": "^2.30.1" diff --git a/components/airtable_oauth/sources/common/common-webhook-field.mjs b/components/airtable_oauth/sources/common/common-webhook-field.mjs index 90b206ab963eb..3113520626693 100644 --- a/components/airtable_oauth/sources/common/common-webhook-field.mjs +++ b/components/airtable_oauth/sources/common/common-webhook-field.mjs @@ -38,11 +38,6 @@ export default { fieldUpdateInfo, ] = Object.entries(fieldObj)[0]; - const timestamp = Date.parse(payload.timestamp); - if (this.isDuplicateEvent(fieldId, timestamp)) return; - this._setLastObjectId(fieldId); - this._setLastTimestamp(timestamp); - const updateType = operation === "createdFieldsById" ? "created" : "updated"; diff --git a/components/airtable_oauth/sources/common/common-webhook-record.mjs b/components/airtable_oauth/sources/common/common-webhook-record.mjs index a610d402e830a..362e41695f7df 100644 --- a/components/airtable_oauth/sources/common/common-webhook-record.mjs +++ b/components/airtable_oauth/sources/common/common-webhook-record.mjs @@ -1,4 +1,5 @@ import common from "./common-webhook.mjs"; +import retry from "async-retry"; export default { ...common, @@ -9,6 +10,17 @@ export default { "tableData", ]; }, + withRetries(apiCall, retries = 3) { + return retry(async (bail) => { + try { + return await apiCall(); + } catch (err) { + return bail(err); + } + }, { + retries, + }); + }, async emitEvent(payload) { const [ tableId, @@ -43,20 +55,20 @@ export default { recordUpdateInfo, ] = Object.entries(recordObj)[0]; - const timestamp = Date.parse(payload.timestamp); - if (this.isDuplicateEvent(recordId, timestamp)) return; - this._setLastObjectId(recordId); - this._setLastTimestamp(timestamp); - let updateType = operation === "createdRecordsById" ? "created" : "updated"; - const { fields } = await this.airtable.getRecord({ - baseId: this.baseId, - tableId, - recordId, - }); + let fields = {}; + try { + ({ fields } = await this.withRetries(() => this.airtable.getRecord({ + baseId: this.baseId, + tableId, + recordId, + }))); + } catch (e) { + fields = {}; + } const summary = `Record ${updateType}: ${fields?.name ?? recordId}`; diff --git a/components/airtable_oauth/sources/common/common-webhook.mjs b/components/airtable_oauth/sources/common/common-webhook.mjs index 8375fcc85fa22..495e2cfcb3fcf 100644 --- a/components/airtable_oauth/sources/common/common-webhook.mjs +++ b/components/airtable_oauth/sources/common/common-webhook.mjs @@ -1,3 +1,4 @@ +import { createHmac } from "crypto"; import airtable from "../../airtable_oauth.app.mjs"; import constants from "../common/constants.mjs"; @@ -48,7 +49,9 @@ export default { }, hooks: { async activate() { - const { id } = await this.airtable.createWebhook({ + const { + id, macSecretBase64, + } = await this.airtable.createWebhook({ baseId: this.baseId, data: { notificationUrl: `${this.http.endpoint}/`, @@ -76,6 +79,7 @@ export default { }, }); this._setHookId(id); + this._setMacSecretBase64(macSecretBase64); }, async deactivate() { const webhookId = this._getHookId(); @@ -94,28 +98,17 @@ export default { _setHookId(hookId) { this.db.set("hookId", hookId); }, - _getLastObjectId() { - return this.db.get("lastObjectId"); - }, - async _setLastObjectId(id) { - this.db.set("lastObjectId", id); + _getMacSecretBase64() { + return this.db.get("macSecretBase64"); }, - _getLastTimestamp() { - return this.db.get("lastTimestamp"); + _setMacSecretBase64(value) { + this.db.set("macSecretBase64", value); }, - async _setLastTimestamp(ts) { - this.db.set("lastTimestamp", ts); + _setLastCursor(cursor) { + this.db.set("lastCursor", cursor); }, - isDuplicateEvent(id, ts) { - const lastId = this._getLastObjectId(); - const lastTs = this._getLastTimestamp(); - - if (id === lastId && (ts - lastTs < 5000 )) { - console.log("Skipping trigger: another event was emitted for the same object within the last 5 seconds"); - return true; - } - - return false; + _getLastCursor() { + return this.db.get("lastCursor"); }, getSpecificationOptions() { throw new Error("getSpecificationOptions is not implemented"); @@ -135,7 +128,9 @@ export default { }, emitDefaultEvent(payload) { const meta = this.generateMeta(payload); - this.$emit(payload, meta); + this.$emit({ + originalPayload: payload, + }, meta); }, async emitEvent(payload) { // sources may call this to customize event emission, but it is @@ -147,30 +142,60 @@ export default { // and it can be silently ignored when not required return true; }, + isSignatureValid(signature, bodyRaw) { + const macSecretBase64FromCreate = this._getMacSecretBase64(); + const macSecretDecoded = Buffer.from(macSecretBase64FromCreate, "base64"); + const body = Buffer.from(bodyRaw, "utf8"); + const hmac = createHmac("sha256", macSecretDecoded) + .update(body.toString(), "ascii") + .digest("hex"); + const expectedContentHmac = "hmac-sha256=" + hmac; + return signature === expectedContentHmac; + }, + payloadFilter() { + return true; + }, }, - async run() { + async run({ + bodyRaw, headers: { ["x-airtable-content-mac"]: signature }, + }) { + const isValid = this.isSignatureValid(signature, bodyRaw); + if (!isValid) { + return this.http.respond({ + status: 401, + }); + } + this.http.respond({ status: 200, }); // webhook pings source, we then fetch webhook events to emit const webhookId = this._getHookId(); let hasMore = false; - const params = {}; + try { await this.saveAdditionalData(); } catch (err) { console.log("Error fetching additional data, proceeding to event emission"); console.log(err); } + const params = { + cursor: this._getLastCursor(), + }; + do { const { cursor, mightHaveMore, payloads, } = await this.airtable.listWebhookPayloads({ + debug: true, baseId: this.baseId, webhookId, params, }); - for (const payload of payloads) { + + const filteredPayloads = payloads.filter(this.payloadFilter); + + for (const payload of filteredPayloads) { try { await this.emitEvent(payload); } catch (err) { @@ -182,5 +207,7 @@ export default { params.cursor = cursor; hasMore = mightHaveMore; } while (hasMore); + + this._setLastCursor(params.cursor); }, }; diff --git a/components/airtable_oauth/sources/new-field/new-field.mjs b/components/airtable_oauth/sources/new-field/new-field.mjs index eddcfc85faec4..266222977f81a 100644 --- a/components/airtable_oauth/sources/new-field/new-field.mjs +++ b/components/airtable_oauth/sources/new-field/new-field.mjs @@ -5,11 +5,14 @@ export default { name: "New Field Created (Instant)", description: "Emit new event when a field is created in the selected table. [See the documentation](https://airtable.com/developers/web/api/get-base-schema)", key: "airtable_oauth-new-field", - version: "1.0.3", + version: "1.0.4", type: "source", dedupe: "unique", methods: { ...common.methods, + payloadFilter(payload) { + return !!payload.changedTablesById; + }, getChangeTypes() { return [ "add", diff --git a/components/airtable_oauth/sources/new-modified-or-deleted-records-instant/new-modified-or-deleted-records-instant.mjs b/components/airtable_oauth/sources/new-modified-or-deleted-records-instant/new-modified-or-deleted-records-instant.mjs index a6c59ad99454e..197d6ec9d94a9 100644 --- a/components/airtable_oauth/sources/new-modified-or-deleted-records-instant/new-modified-or-deleted-records-instant.mjs +++ b/components/airtable_oauth/sources/new-modified-or-deleted-records-instant/new-modified-or-deleted-records-instant.mjs @@ -1,14 +1,13 @@ import common from "../common/common-webhook-record.mjs"; import constants from "../common/constants.mjs"; import sampleEmit from "./test-event.mjs"; -import airtable from "../../airtable_oauth.app.mjs"; export default { ...common, name: "New Record Created, Updated or Deleted (Instant)", description: "Emit new event when a record is added, updated, or deleted in a table or selected view.", key: "airtable_oauth-new-modified-or-deleted-records-instant", - version: "0.1.3", + version: "0.1.4", type: "source", dedupe: "unique", props: { @@ -27,7 +26,7 @@ export default { }, watchDataInFieldIds: { propDefinition: [ - airtable, + common.props.airtable, "sortFieldId", (c) => ({ baseId: c.baseId, @@ -42,6 +41,9 @@ export default { }, methods: { ...common.methods, + payloadFilter(payload) { + return !!payload.changedTablesById; + }, getDataTypes() { return [ "tableData", diff --git a/components/airtable_oauth/sources/new-or-modified-field/new-or-modified-field.mjs b/components/airtable_oauth/sources/new-or-modified-field/new-or-modified-field.mjs index 47c067f500fa1..bc0bc80008010 100644 --- a/components/airtable_oauth/sources/new-or-modified-field/new-or-modified-field.mjs +++ b/components/airtable_oauth/sources/new-or-modified-field/new-or-modified-field.mjs @@ -6,11 +6,14 @@ export default { name: "New or Modified Field (Instant)", description: "Emit new event when a field is created or updated in the selected table", key: "airtable_oauth-new-or-modified-field", - version: "1.0.3", + version: "1.0.4", type: "source", dedupe: "unique", methods: { ...common.methods, + payloadFilter(payload) { + return !!payload.changedTablesById; + }, getChangeTypes() { return [ "add", diff --git a/components/airtable_oauth/sources/new-or-modified-records/new-or-modified-records.mjs b/components/airtable_oauth/sources/new-or-modified-records/new-or-modified-records.mjs index 1c466d83cd1fc..871f802159c4e 100644 --- a/components/airtable_oauth/sources/new-or-modified-records/new-or-modified-records.mjs +++ b/components/airtable_oauth/sources/new-or-modified-records/new-or-modified-records.mjs @@ -1,16 +1,18 @@ import common from "../common/common-webhook-record.mjs"; -import airtable from "../../airtable_oauth.app.mjs"; export default { ...common, name: "New or Modified Records (Instant)", key: "airtable_oauth-new-or-modified-records", description: "Emit new event for each new or modified record in a table or view", - version: "1.0.3", + version: "1.0.4", type: "source", dedupe: "unique", methods: { ...common.methods, + payloadFilter(payload) { + return !!payload.changedTablesById; + }, getChangeTypes() { return [ "add", @@ -22,7 +24,7 @@ export default { ...common.props, watchDataInFieldIds: { propDefinition: [ - airtable, + common.props.airtable, "sortFieldId", (c) => ({ baseId: c.baseId, diff --git a/components/airtable_oauth/sources/new-records/new-records.mjs b/components/airtable_oauth/sources/new-records/new-records.mjs index 117927ef51bbb..405d6f90693da 100644 --- a/components/airtable_oauth/sources/new-records/new-records.mjs +++ b/components/airtable_oauth/sources/new-records/new-records.mjs @@ -5,11 +5,14 @@ export default { name: "New Record(s) Created (Instant)", description: "Emit new event for each new record in a table", key: "airtable_oauth-new-records", - version: "1.0.3", + version: "1.0.4", type: "source", dedupe: "unique", methods: { ...common.methods, + payloadFilter(payload) { + return !!payload.changedTablesById; + }, getChangeTypes() { return [ "add", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 75e25ef3fe862..3cfc22053defb 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -606,9 +606,15 @@ importers: airtable: specifier: ^0.11.1 version: 0.11.6 + async-retry: + specifier: ^1.3.3 + version: 1.3.3 bottleneck: specifier: ^2.19.5 version: 2.19.5 + crypto: + specifier: ^1.0.1 + version: 1.0.1 lodash.chunk: specifier: ^4.2.0 version: 4.2.0