From 8d2d86998aebd3776f843829c413c3f2839fb419 Mon Sep 17 00:00:00 2001 From: Max Gruenfelder Date: Thu, 2 Apr 2026 23:26:20 +0200 Subject: [PATCH 1/4] docu and triggerEvent --- docs/use-as-cap-outbox/index.md | 119 ++++++++++++++++++ src/outbox/EventQueueGenericOutboxHandler.js | 48 ++++++- .../outboxProject/srv/service/saga-service.js | 8 ++ test/eventQueueOutbox.test.js | 111 ++++++++++++++++ 4 files changed, 281 insertions(+), 5 deletions(-) diff --git a/docs/use-as-cap-outbox/index.md b/docs/use-as-cap-outbox/index.md index 4266b0dd..c43b7c3a 100644 --- a/docs/use-as-cap-outbox/index.md +++ b/docs/use-as-cap-outbox/index.md @@ -409,3 +409,122 @@ this.on("myBatchEvent", (req) => { })); }); ``` + +## Saga Pattern + +The event-queue supports a saga-style pattern for chaining event handlers. When a handler completes, the +event-queue automatically triggers a designated **successor event** — either a success or a failure follow-up — +allowing you to model multi-step asynchronous workflows with clear separation of concerns. + +### How It Works + +Register successor handlers with the special `#succeeded` or `#failed` suffix: + +- **`/#succeeded`** — triggered when the handler for `` returns `EventProcessingStatus.Done` +- **`/#failed`** — triggered when the handler returns `EventProcessingStatus.Error` or throws + +You can also register **generic** successor handlers (`#succeeded` / `#failed`) that apply to every event in the +service that does not have a dedicated successor. + +```javascript +class MyService extends cds.Service { + async init() { + await super.init(); + + // Primary handler + this.on("orderCreated", async (req) => { + // ... business logic + return EventProcessingStatus.Done; + }); + + // Runs on success — event-specific + this.on("orderCreated/#succeeded", async (req) => { + // req.eventQueue.triggerEvent is available here (see below) + }); + + // Runs on failure — event-specific + this.on("orderCreated/#failed", async (req) => { + // req.data.error contains the serialised error message + }); + + // Generic fallback — runs for any event without a dedicated handler + this.on("#succeeded", async (req) => { /* ... */ }); + this.on("#failed", async (req) => { /* ... */ }); + } +} +``` + +### Passing Data to the Successor + +Return a `nextData` property from the primary handler to forward arbitrary data to the successor's `req.data`: + +```javascript +this.on("orderCreated", async (req) => { + const orderId = await createOrder(req.data); + return { + status: EventProcessingStatus.Done, + nextData: { orderId }, // available as req.data.orderId in the successor + }; +}); +``` + +### Accessing the Trigger Event Context (`req.eventQueue.triggerEvent`) + +When a successor handler is invoked, `req.eventQueue.triggerEvent` is populated with context from the parent event. +This gives the successor full visibility into what happened in the previous step. + +| Field | Type | Description | +| -------------------- | -------- | ------------------------------------------------------------------------------------------- | +| `triggerEventResult` | any | The raw return value of the parent handler (e.g. `{ status: 2, nextData: {...} }`) | +| `ID` | string | UUID of the parent queue entry | +| `status` | number | Status of the parent queue entry at processing time | +| `payload` | any | Payload of the parent queue entry | +| `referenceEntity` | string | Reference entity of the parent event (if set) | +| `referenceEntityKey` | string | Reference entity key of the parent event (if set) | +| `lastAttempTimestamp`| string | Timestamp of the last processing attempt of the parent event | + +`req.eventQueue.triggerEvent` is set in both **`#succeeded`** and **`#failed`** handlers, but only when the event +was processed as a single entry (no clustering). When the parent handler **threw an exception**, `triggerEventResult` +will be `undefined` — the queue entry fields (`ID`, `status`, `payload`, etc.) are still present. + +```javascript +this.on("orderCreated/#succeeded", async (req) => { + const { triggerEventResult, ID } = req.eventQueue.triggerEvent; + + // triggerEventResult is exactly what the parent handler returned + console.log(triggerEventResult); + // → { status: 2, nextData: { orderId: "..." } } + + // ID is the UUID of the parent queue entry + console.log(ID); // → "3f2e1a..." +}); +``` + +### Failure Handling and Error Propagation + +When a primary handler throws or returns `EventProcessingStatus.Error`, the `#failed` successor receives the +serialised error message in `req.data.error`: + +```javascript +this.on("orderCreated/#failed", async (req) => { + console.log(req.data.error); // → "Error: Payment gateway timeout" + // compensate, notify, etc. + return EventProcessingStatus.Done; +}); +``` + +### Stopping the Chain + +A `#failed` handler is a terminal step — the event-queue will **not** trigger another successor after it, even if +one is registered. This prevents infinite failure chains. + +### Service-Specific vs. Generic Handlers + +| Pattern | Applies to | +| -------------------- | ----------------------------- | +| `/#succeeded` | Only `` | +| `/#failed` | Only `` | +| `#succeeded` | All events without a specific handler | +| `#failed` | All events without a specific handler | + +Event-specific handlers take priority over generic ones. diff --git a/src/outbox/EventQueueGenericOutboxHandler.js b/src/outbox/EventQueueGenericOutboxHandler.js index 97cc9667..a5692d7a 100644 --- a/src/outbox/EventQueueGenericOutboxHandler.js +++ b/src/outbox/EventQueueGenericOutboxHandler.js @@ -18,6 +18,15 @@ const EVENT_QUEUE_ACTIONS = { SAGA_FAILED: "#failed", }; +const PROPAGATE_EVENT_QUEUE_ENTRIES = [ + "ID", + "lastAttempTimestamp", + "payload", + "referenceEntity", + "referenceEntityKey", + "status", +]; + class EventQueueGenericOutboxHandler extends EventQueueBaseClass { constructor(context, eventType, eventSubType, config) { super(context, eventType, eventSubType, config); @@ -334,11 +343,23 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass { #buildDispatchData(payload, { key, queueEntries } = {}) { const { useEventQueueUser } = this.eventConfig; const userId = useEventQueueUser ? config.userId : payload.contextUser; + let triggerEvent; + + if (payload.data.triggerEvent) { + try { + triggerEvent = JSON.parse(payload.data.triggerEvent); + } catch (err) { + this.logger.error("[saga] error parsing triggering event data", err); + } finally { + delete payload.data.triggerEvent; + } + } + const req = payload._fromSend ? new cds.Request(payload) : new cds.Event(payload); const invocationFn = payload._fromSend ? "send" : "emit"; delete req._fromSend; delete req.contextUser; - req.eventQueue = { processor: this, key, queueEntries, payload }; + req.eventQueue = { processor: this, key, queueEntries, payload, triggerEvent }; if (this.eventConfig.propagateContextProperties?.length && this.transactionMode === "isolated" && cds.context) { for (const prop of this.eventConfig.propagateContextProperties) { @@ -362,11 +383,11 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass { } async processEvent(processContext, key, queueEntries, payload) { - let statusTuple; + let statusTuple, result; const { userId, invocationFn, req } = this.#buildDispatchData(payload, { key, queueEntries }); try { await this.#setContextUser(processContext, userId, req); - const result = await this.__srvUnboxed.tx(processContext)[invocationFn](req); + result = await this.__srvUnboxed.tx(processContext)[invocationFn](req); statusTuple = this.#determineResultStatus(result, queueEntries); } catch (err) { this.logger.error("error processing outboxed service call", err, { @@ -381,11 +402,11 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass { ]); } - await this.#publishFollowupEvents(processContext, req, statusTuple); + await this.#publishFollowupEvents(processContext, req, statusTuple, result); return statusTuple; } - async #publishFollowupEvents(processContext, req, statusTuple) { + async #publishFollowupEvents(processContext, req, statusTuple, triggerEventResult) { const succeeded = this.#checkHandlerExists({ event: req.event, saga: EVENT_QUEUE_ACTIONS.SAGA_SUCCESS }); const failed = this.#checkHandlerExists({ event: req.event, saga: EVENT_QUEUE_ACTIONS.SAGA_FAILED }); @@ -412,11 +433,28 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass { result.status === EventProcessingStatus.Done && !req.event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_SUCCESS) ) { + if (statusTuple.length === 1 && req.eventQueue.queueEntries.length === 1) { + const triggerEventPropagate = { triggerEventResult }; + const [triggerEvent] = req.eventQueue.queueEntries; + for (const propertyName of PROPAGATE_EVENT_QUEUE_ENTRIES) { + triggerEventPropagate[propertyName] = triggerEvent[propertyName]; + } + data.triggerEvent = JSON.stringify(triggerEventPropagate); + } + await this.__srv.tx(processContext).send(succeeded, data); } if (failed && result.status === EventProcessingStatus.Error) { result.error && (data.error = this._error2String(result.error)); + if (statusTuple.length === 1 && req.eventQueue.queueEntries.length === 1) { + const triggerEventPropagate = { triggerEventResult }; + const [triggerEvent] = req.eventQueue.queueEntries; + for (const propertyName of PROPAGATE_EVENT_QUEUE_ENTRIES) { + triggerEventPropagate[propertyName] = triggerEvent[propertyName]; + } + data.triggerEvent = JSON.stringify(triggerEventPropagate); + } await this.__srv.tx(processContext).send(failed, data); } diff --git a/test/asset/outboxProject/srv/service/saga-service.js b/test/asset/outboxProject/srv/service/saga-service.js index 5bb4f171..f3ea7184 100644 --- a/test/asset/outboxProject/srv/service/saga-service.js +++ b/test/asset/outboxProject/srv/service/saga-service.js @@ -2,10 +2,13 @@ const cds = require("@sap/cds"); +const capturedTriggerEvents = {}; + class StandardService extends cds.Service { async init() { await super.init(); this.on("saga", (req) => { + capturedTriggerEvents[req.event] = req.eventQueue?.triggerEvent; cds.log(this.name).info(req.event, { data: req.data, user: req.user.id, @@ -38,6 +41,7 @@ class StandardService extends cds.Service { }); this.on("#succeeded", (req) => { + capturedTriggerEvents[req.event] = req.eventQueue?.triggerEvent; cds.log(this.name).info(req.event, { data: req.data, user: req.user.id, @@ -51,6 +55,7 @@ class StandardService extends cds.Service { }); this.on("#failed", (req) => { + capturedTriggerEvents[req.event] = req.eventQueue?.triggerEvent; cds.log(this.name).info(req.event, { data: req.data, user: req.user.id, @@ -78,6 +83,7 @@ class StandardService extends cds.Service { }); this.on("saga/#succeeded", (req) => { + capturedTriggerEvents[req.event] = req.eventQueue?.triggerEvent; cds.log(this.name).info(req.event, { data: req.data, user: req.user.id, @@ -91,6 +97,7 @@ class StandardService extends cds.Service { }); this.on("saga/#failed", (req) => { + capturedTriggerEvents[req.event] = req.eventQueue?.triggerEvent; cds.log(this.name).info(req.event, { data: req.data, user: req.user.id, @@ -131,4 +138,5 @@ class StandardService extends cds.Service { } } +StandardService.capturedTriggerEvents = capturedTriggerEvents; module.exports = StandardService; diff --git a/test/eventQueueOutbox.test.js b/test/eventQueueOutbox.test.js index a5f9ff23..e554c9f2 100644 --- a/test/eventQueueOutbox.test.js +++ b/test/eventQueueOutbox.test.js @@ -30,6 +30,7 @@ const eventScheduler = require("../src/shared/eventScheduler"); const CUSTOM_HOOKS_SRV = "OutboxCustomHooks"; const basePath = path.join(__dirname, "asset", "outboxProject"); +const SagaService = require(path.join(basePath, "srv/service/saga-service.js")); cds.env.requires.NotificationServicePeriodic = { impl: path.join(basePath, "srv/service/servicePeriodic.js"), @@ -2714,6 +2715,116 @@ describe("event-queue outbox", () => { expect(loggerMock.callsLengths().error).toEqual(0); }); }); + + describe("triggerEvent propagation", () => { + beforeEach(() => { + Object.keys(SagaService.capturedTriggerEvents).forEach( + (key) => delete SagaService.capturedTriggerEvents[key] + ); + }); + + it("triggerEventResult reflects the return value of the parent handler", async () => { + const service = await cds.connect.to("Saga"); + await service.send("saga", {}); + await commitAndOpenNew(); + await processEventQueue(tx.context, "CAP_OUTBOX", service.name); + + const triggerEvent = SagaService.capturedTriggerEvents["saga/#succeeded"]; + expect(triggerEvent).toBeDefined(); + expect(triggerEvent.triggerEventResult).toEqual({ status: EventProcessingStatus.Done }); + expect(loggerMock.callsLengths().error).toEqual(0); + }); + + it("triggerEventResult contains the full handler return value including nextData", async () => { + const service = await cds.connect.to("Saga"); + await service.send("saga", { nextData: { extra: "payload" } }); + await commitAndOpenNew(); + await processEventQueue(tx.context, "CAP_OUTBOX", service.name); + + const triggerEvent = SagaService.capturedTriggerEvents["saga/#succeeded"]; + expect(triggerEvent).toBeDefined(); + expect(triggerEvent.triggerEventResult).toEqual({ + status: EventProcessingStatus.Done, + nextData: { extra: "payload" }, + }); + expect(loggerMock.callsLengths().error).toEqual(0); + }); + + it("triggerEvent contains the parent queue entry ID, status, and payload", async () => { + const service = await cds.connect.to("Saga"); + await service.send("saga", {}); + await commitAndOpenNew(); + await processEventQueue(tx.context, "CAP_OUTBOX", service.name); + + const events = await testHelper.selectEventQueueAndReturn(tx, { + expectedLength: 2, + additionalColumns: ["payload", "ID"], + }); + const sagaEntry = events.find((e) => JSON.parse(e.payload).event === "saga"); + + const triggerEvent = SagaService.capturedTriggerEvents["saga/#succeeded"]; + expect(triggerEvent).toBeDefined(); + expect(triggerEvent.ID).toEqual(sagaEntry.ID); + expect(triggerEvent.payload).toBeDefined(); + expect(triggerEvent.status).toEqual(expect.any(Number)); + expect(loggerMock.callsLengths().error).toEqual(0); + }); + + it("triggerEvent is undefined for the originating event (not a successor)", async () => { + const service = await cds.connect.to("Saga"); + await service.send("saga", {}); + await commitAndOpenNew(); + await processEventQueue(tx.context, "CAP_OUTBOX", service.name); + + expect(SagaService.capturedTriggerEvents["saga"]).toBeUndefined(); + expect(loggerMock.callsLengths().error).toEqual(0); + }); + + it("triggerEvent is propagated to the failed handler with triggerEventResult", async () => { + const service = await cds.connect.to("Saga"); + await service.send("saga", { + status: EventProcessingStatus.Error, + nextData: { status: EventProcessingStatus.Done }, + }); + await commitAndOpenNew(); + await processEventQueue(tx.context, "CAP_OUTBOX", service.name); + + const triggerEvent = SagaService.capturedTriggerEvents["saga/#failed"]; + expect(triggerEvent).toBeDefined(); + expect(triggerEvent.triggerEventResult).toEqual({ + status: EventProcessingStatus.Error, + nextData: { status: EventProcessingStatus.Done }, + }); + expect(triggerEvent.ID).toEqual(expect.any(String)); + expect(loggerMock.callsLengths().error).toEqual(0); + }); + + it("generic #succeeded handler also receives triggerEvent", async () => { + const service = await cds.connect.to("Saga"); + await service.send("general", {}); + await commitAndOpenNew(); + await processEventQueue(tx.context, "CAP_OUTBOX", service.name); + + const triggerEvent = SagaService.capturedTriggerEvents["#succeeded"]; + expect(triggerEvent).toBeDefined(); + expect(triggerEvent.triggerEventResult).toEqual({ status: EventProcessingStatus.Done }); + expect(loggerMock.callsLengths().error).toEqual(0); + }); + + it("triggerEventResult is undefined when the handler threw an exception but other fields are still set", async () => { + const service = await cds.connect.to("Saga"); + await service.send("saga", { throw: "simulated error", nextData: { status: EventProcessingStatus.Done } }); + await commitAndOpenNew(); + await processEventQueue(tx.context, "CAP_OUTBOX", service.name); + + const triggerEvent = SagaService.capturedTriggerEvents["saga/#failed"]; + expect(triggerEvent).toBeDefined(); + expect(triggerEvent.triggerEventResult).toBeUndefined(); + expect(triggerEvent.ID).toEqual(expect.any(String)); + expect(SagaService.capturedTriggerEvents["saga/#succeeded"]).toBeUndefined(); + expect(loggerMock.callsLengths().error).toEqual(1); + }); + }); }); describe("retry open and failed after", () => { From 1443329ae260de5e41edc9488962e405484b9d87 Mon Sep 17 00:00:00 2001 From: Max Gruenfelder Date: Fri, 3 Apr 2026 10:28:47 +0200 Subject: [PATCH 2/4] fixes --- src/config.js | 31 +- src/outbox/EventQueueGenericOutboxHandler.js | 33 +- .../outboxProject/srv/service/saga-service.js | 39 +++ test/eventQueueOutbox.test.js | 294 ++++++++++++++---- 4 files changed, 309 insertions(+), 88 deletions(-) diff --git a/src/config.js b/src/config.js index 4fa68814..3e0f6404 100644 --- a/src/config.js +++ b/src/config.js @@ -30,6 +30,7 @@ const UTC_DEFAULT = false; const USE_CRON_TZ_DEFAULT = true; const SAGA_SUCCESS = "#succeeded"; const SAGA_FAILED = "#failed"; +const SAGA_DONE = "#done"; const BASE_TABLES = { EVENT: "sap.eventqueue.Event", @@ -387,19 +388,21 @@ class Config { result.adHoc ); result.adHoc[key] = specificEventConfig; - const sagaSuccessKey = [fnName, SAGA_SUCCESS].join("/"); - if (config.events[sagaSuccessKey]) { - const [sagaKey, sagaSpecificEventConfig] = this.addCAPOutboxEventSpecificAction( - srvConfig, - name, - fnName, - result.adHoc - ); - result.adHoc[sagaKey] = sagaSpecificEventConfig; - } else { - const sagaConfig = { ...specificEventConfig }; - sagaConfig.subType = [sagaConfig.subType, SAGA_SUCCESS].join("/"); - result.adHoc[[key, SAGA_SUCCESS].join("/")] = sagaConfig; + for (const sagaSuffix of [SAGA_SUCCESS, SAGA_DONE, SAGA_FAILED]) { + const sagaKey = [fnName, sagaSuffix].join("/"); + if (config.events[sagaKey]) { + const [adHocKey, sagaSpecificEventConfig] = this.addCAPOutboxEventSpecificAction( + srvConfig, + name, + fnName, + result.adHoc + ); + result.adHoc[adHocKey] = sagaSpecificEventConfig; + } else { + const sagaConfig = { ...specificEventConfig }; + sagaConfig.subType = [sagaConfig.subType, sagaSuffix].join("/"); + result.adHoc[[key, sagaSuffix].join("/")] = sagaConfig; + } } } } @@ -434,7 +437,7 @@ class Config { } const [withoutSaga, sagaSuffix] = action.split("/"); - if ([SAGA_FAILED, SAGA_SUCCESS].includes(sagaSuffix)) { + if ([SAGA_FAILED, SAGA_SUCCESS, SAGA_DONE].includes(sagaSuffix)) { if (config?.events?.[withoutSaga]) { return this.#mixCAPPropertyNamesWithEventQueueNames(config.events[withoutSaga]); } diff --git a/src/outbox/EventQueueGenericOutboxHandler.js b/src/outbox/EventQueueGenericOutboxHandler.js index a5692d7a..96dc0ccc 100644 --- a/src/outbox/EventQueueGenericOutboxHandler.js +++ b/src/outbox/EventQueueGenericOutboxHandler.js @@ -16,6 +16,7 @@ const EVENT_QUEUE_ACTIONS = { CHECK_AND_ADJUST: "eventQueueCheckAndAdjustPayload", SAGA_SUCCESS: "#succeeded", SAGA_FAILED: "#failed", + SAGA_DONE: "#done", }; const PROPAGATE_EVENT_QUEUE_ENTRIES = [ @@ -321,7 +322,7 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass { return genericHandler ?? null; } - if (event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_SUCCESS)) { + if (event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_SUCCESS) || event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_DONE)) { [event] = event.split("/"); } @@ -345,7 +346,7 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass { const userId = useEventQueueUser ? config.userId : payload.contextUser; let triggerEvent; - if (payload.data.triggerEvent) { + if (payload.data?.triggerEvent) { try { triggerEvent = JSON.parse(payload.data.triggerEvent); } catch (err) { @@ -409,12 +410,13 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass { async #publishFollowupEvents(processContext, req, statusTuple, triggerEventResult) { const succeeded = this.#checkHandlerExists({ event: req.event, saga: EVENT_QUEUE_ACTIONS.SAGA_SUCCESS }); const failed = this.#checkHandlerExists({ event: req.event, saga: EVENT_QUEUE_ACTIONS.SAGA_FAILED }); + const done = this.#checkHandlerExists({ event: req.event, saga: EVENT_QUEUE_ACTIONS.SAGA_DONE }); - if (!succeeded && !failed) { + if (!succeeded && !failed && !done) { return; } - if (req.event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_FAILED)) { + if (req.event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_FAILED) || req.event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_DONE)) { return; } @@ -426,6 +428,7 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass { tx._eventQueue.events = []; } + const queued = cds.queued(this.__srv); for (const [, result] of statusTuple) { const data = result.nextData ?? req.data; if ( @@ -442,7 +445,7 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass { data.triggerEvent = JSON.stringify(triggerEventPropagate); } - await this.__srv.tx(processContext).send(succeeded, data); + await queued.tx(processContext).send(succeeded, data); } if (failed && result.status === EventProcessingStatus.Error) { @@ -455,7 +458,19 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass { } data.triggerEvent = JSON.stringify(triggerEventPropagate); } - await this.__srv.tx(processContext).send(failed, data); + await queued.tx(processContext).send(failed, data); + } + + if (done && !req.event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_SUCCESS)) { + if (statusTuple.length === 1 && req.eventQueue.queueEntries.length === 1) { + const triggerEventPropagate = { triggerEventResult }; + const [triggerEvent] = req.eventQueue.queueEntries; + for (const propertyName of PROPAGATE_EVENT_QUEUE_ENTRIES) { + triggerEventPropagate[propertyName] = triggerEvent[propertyName]; + } + data.triggerEvent = JSON.stringify(triggerEventPropagate); + } + await queued.tx(processContext).send(done, data); } delete result.nextData; @@ -464,7 +479,11 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass { if (config.insertEventsBeforeCommit) { this.nextSagaEvents = tx._eventQueue?.events; } else { - this.nextSagaEvents = tx._eventQueue?.events.filter((event) => JSON.parse(event.payload).event === failed); + const hasError = statusTuple.some(([, result]) => result.status === EventProcessingStatus.Error); + this.nextSagaEvents = tx._eventQueue?.events.filter((event) => { + const eventName = JSON.parse(event.payload).event; + return eventName === failed || (hasError && eventName === done); + }); } if (tx._eventQueue) { diff --git a/test/asset/outboxProject/srv/service/saga-service.js b/test/asset/outboxProject/srv/service/saga-service.js index f3ea7184..0685246a 100644 --- a/test/asset/outboxProject/srv/service/saga-service.js +++ b/test/asset/outboxProject/srv/service/saga-service.js @@ -135,6 +135,45 @@ class StandardService extends cds.Service { ...(req.data.errorMessage && { error: new Error(req.data.errorMessage) }), }; }); + + this.on("saga/#done", (req) => { + capturedTriggerEvents[req.event] = req.eventQueue?.triggerEvent; + cds.log(this.name).info(req.event, { + data: req.data, + user: req.user.id, + error: req.data.error, + }); + + return { + status: req.data.status ?? 2, + }; + }); + + this.on("specific/#done", (req) => { + capturedTriggerEvents[req.event] = req.eventQueue?.triggerEvent; + cds.log(this.name).info(req.event, { + data: req.data, + user: req.user.id, + error: req.data.error, + }); + + return { + status: req.data.status ?? 2, + }; + }); + + this.on("#done", (req) => { + capturedTriggerEvents[req.event] = req.eventQueue?.triggerEvent; + cds.log(this.name).info(req.event, { + data: req.data, + user: req.user.id, + error: req.data.error, + }); + + return { + status: req.data.status ?? 2, + }; + }); } } diff --git a/test/eventQueueOutbox.test.js b/test/eventQueueOutbox.test.js index e554c9f2..7dab816e 100644 --- a/test/eventQueueOutbox.test.js +++ b/test/eventQueueOutbox.test.js @@ -986,6 +986,7 @@ describe("event-queue outbox", () => { delete eventQueue.config._rawEventMap["default##CAP_OUTBOX##NotificationServicePeriodic.action"]; delete eventQueue.config._rawEventMap["default##CAP_OUTBOX##NotificationServicePeriodic.action/#succeeded"]; delete eventQueue.config._rawEventMap["default##CAP_OUTBOX##NotificationServicePeriodic.action/#failed"]; + delete eventQueue.config._rawEventMap["default##CAP_OUTBOX##NotificationServicePeriodic.action/#done"]; // NOTE: after deleting the config make sure config is not available await processEventQueue(tx.context, "CAP_OUTBOX", [service.name, "action"].join(".")); @@ -2498,15 +2499,20 @@ describe("event-queue outbox", () => { await service.send("saga", {}); await commitAndOpenNew(); await processEventQueue(tx.context, "CAP_OUTBOX", service.name); - const [done, next] = await testHelper.selectEventQueueAndReturn(tx, { - expectedLength: 2, - additionalColumns: ["payload"], - }); - expect(JSON.parse(done.payload)).toMatchObject({ event: "saga" }); + let [finished, done, succeeded] = ( + await testHelper.selectEventQueueAndReturn(tx, { + expectedLength: 3, + additionalColumns: ["payload"], + }) + ).sort((a, b) => JSON.parse(a.payload).event.localeCompare(JSON.parse(b.payload).event)); + expect(JSON.parse(finished.payload)).toMatchObject({ event: "saga" }); expect(done.status).toEqual(EventProcessingStatus.Done); - expect(JSON.parse(next.payload)).toMatchObject({ event: "saga/#succeeded" }); - expect(next.status).toEqual(EventProcessingStatus.Done); + expect(JSON.parse(succeeded.payload)).toMatchObject({ event: "saga/#succeeded" }); + expect(succeeded.status).toEqual(EventProcessingStatus.Done); + + expect(JSON.parse(done.payload)).toMatchObject({ event: "saga/#done" }); + expect(done.status).toEqual(EventProcessingStatus.Done); expect(loggerMock.callsLengths().error).toEqual(0); }); @@ -2519,7 +2525,7 @@ describe("event-queue outbox", () => { await commitAndOpenNew(); await processEventQueue(tx.context, "CAP_OUTBOX", service.name); const [done, next] = await testHelper.selectEventQueueAndReturn(tx, { - expectedLength: 2, + expectedLength: 3, additionalColumns: ["payload", "lastAttemptTimestamp"], }); expect(JSON.parse(done.payload)).toMatchObject({ event: "saga" }); @@ -2537,7 +2543,7 @@ describe("event-queue outbox", () => { await commitAndOpenNew(); await processEventQueue(tx.context, "CAP_OUTBOX", service.name); const [done, next] = await testHelper.selectEventQueueAndReturn(tx, { - expectedLength: 2, + expectedLength: 3, additionalColumns: ["payload", "error"], }); expect(JSON.parse(done.payload)).toMatchObject({ event: "saga" }); @@ -2555,7 +2561,7 @@ describe("event-queue outbox", () => { await commitAndOpenNew(); await processEventQueue(tx.context, "CAP_OUTBOX", service.name); const [done, next] = await testHelper.selectEventQueueAndReturn(tx, { - expectedLength: 2, + expectedLength: 3, additionalColumns: ["payload", "lastAttemptTimestamp"], }); expect(JSON.parse(done.payload)).toMatchObject({ event: "saga" }); @@ -2577,17 +2583,15 @@ describe("event-queue outbox", () => { }); await commitAndOpenNew(); await processEventQueue(tx.context, "CAP_OUTBOX", service.name); - const [done, next, nextFailed] = await testHelper.selectEventQueueAndReturn(tx, { - expectedLength: 3, + const events = await testHelper.selectEventQueueAndReturn(tx, { + expectedLength: 4, additionalColumns: ["payload", "lastAttemptTimestamp"], }); - expect(JSON.parse(done.payload)).toMatchObject({ event: "saga" }); + const done = events.find((e) => JSON.parse(e.payload).event === "saga"); + const next = events.find((e) => JSON.parse(e.payload).event === "saga/#succeeded"); + const nextFailed = events.find((e) => JSON.parse(e.payload).event === "saga/#failed"); expect(done.status).toEqual(EventProcessingStatus.Done); - - expect(JSON.parse(next.payload)).toMatchObject({ event: "saga/#succeeded" }); expect(next.status).toEqual(EventProcessingStatus.Error); - - expect(JSON.parse(nextFailed.payload)).toMatchObject({ event: "saga/#failed" }); expect(nextFailed.status).toEqual(EventProcessingStatus.Error); expect(loggerMock.callsLengths().error).toEqual(0); }); @@ -2603,7 +2607,7 @@ describe("event-queue outbox", () => { await commitAndOpenNew(); await processEventQueue(tx.context, "CAP_OUTBOX", service.name); const [done, next] = await testHelper.selectEventQueueAndReturn(tx, { - expectedLength: 2, + expectedLength: 3, additionalColumns: ["payload"], }); expect(JSON.parse(done.payload)).toMatchObject({ event: "saga" }); @@ -2621,28 +2625,26 @@ describe("event-queue outbox", () => { await service.send("specific", { status: EventProcessingStatus.Done }); await commitAndOpenNew(); await processEventQueue(tx.context, "CAP_OUTBOX", `${service.name}.specific`); - let [next, done] = await testHelper.selectEventQueueAndReturn(tx, { - expectedLength: 2, + let events = await testHelper.selectEventQueueAndReturn(tx, { + expectedLength: 3, additionalColumns: ["payload"], }); - expect(JSON.parse(done.payload)).toMatchObject({ event: "specific" }); - expect(done.status).toEqual(EventProcessingStatus.Done); - - expect(JSON.parse(next.payload)).toMatchObject({ event: "specific/#succeeded" }); - expect(next.status).toEqual(EventProcessingStatus.Open); + let specific = events.find((e) => JSON.parse(e.payload).event === "specific"); + let succeeded = events.find((e) => JSON.parse(e.payload).event === "specific/#succeeded"); + expect(specific.status).toEqual(EventProcessingStatus.Done); + expect(succeeded.status).toEqual(EventProcessingStatus.Open); await commitAndOpenNew(); await processEventQueue(tx.context, "CAP_OUTBOX", `${service.name}.specific/#succeeded`); - [done, next] = await testHelper.selectEventQueueAndReturn(tx, { - expectedLength: 2, + events = await testHelper.selectEventQueueAndReturn(tx, { + expectedLength: 3, additionalColumns: ["payload"], }); - expect(JSON.parse(done.payload)).toMatchObject({ event: "specific" }); - expect(done.status).toEqual(EventProcessingStatus.Done); - - expect(JSON.parse(next.payload)).toMatchObject({ event: "specific/#succeeded" }); - expect(next.status).toEqual(EventProcessingStatus.Done); + specific = events.find((e) => JSON.parse(e.payload).event === "specific"); + succeeded = events.find((e) => JSON.parse(e.payload).event === "specific/#succeeded"); + expect(specific.status).toEqual(EventProcessingStatus.Done); + expect(succeeded.status).toEqual(EventProcessingStatus.Done); expect(loggerMock.callsLengths().error).toEqual(0); }); @@ -2654,28 +2656,38 @@ describe("event-queue outbox", () => { }); await commitAndOpenNew(); await processEventQueue(tx.context, "CAP_OUTBOX", `${service.name}.specific`); - let [next, done] = await testHelper.selectEventQueueAndReturn(tx, { - expectedLength: 2, - additionalColumns: ["payload"], - }); - expect(JSON.parse(done.payload)).toMatchObject({ event: "specific" }); - expect(done.status).toEqual(EventProcessingStatus.Error); + let [finished, done, failed] = ( + await testHelper.selectEventQueueAndReturn(tx, { + expectedLength: 3, + additionalColumns: ["payload"], + }) + ).sort((a, b) => JSON.parse(a.payload).event.localeCompare(JSON.parse(b.payload).event)); + + expect(finished.status).toEqual(EventProcessingStatus.Error); - expect(JSON.parse(next.payload)).toMatchObject({ event: "specific/#failed" }); - expect(next.status).toEqual(EventProcessingStatus.Open); + expect(JSON.parse(failed.payload)).toMatchObject({ event: "specific/#failed" }); + expect(failed.status).toEqual(EventProcessingStatus.Open); + + expect(JSON.parse(done.payload)).toMatchObject({ event: "specific/#done" }); + expect(done.status).toEqual(EventProcessingStatus.Open); await commitAndOpenNew(); await processEventQueue(tx.context, "CAP_OUTBOX", `${service.name}.specific/#failed`); - [done, next] = await testHelper.selectEventQueueAndReturn(tx, { - expectedLength: 2, - additionalColumns: ["payload"], - }); - expect(JSON.parse(done.payload)).toMatchObject({ event: "specific" }); - expect(done.status).toEqual(EventProcessingStatus.Error); + [finished, done, failed] = ( + await testHelper.selectEventQueueAndReturn(tx, { + expectedLength: 3, + additionalColumns: ["payload"], + }) + ).sort((a, b) => JSON.parse(a.payload).event.localeCompare(JSON.parse(b.payload).event)); + expect(JSON.parse(finished.payload)).toMatchObject({ event: "specific" }); + expect(finished.status).toEqual(EventProcessingStatus.Error); - expect(JSON.parse(next.payload)).toMatchObject({ event: "specific/#failed" }); - expect(next.status).toEqual(EventProcessingStatus.Done); + expect(JSON.parse(failed.payload)).toMatchObject({ event: "specific/#failed" }); + expect(failed.status).toEqual(EventProcessingStatus.Done); + + expect(JSON.parse(done.payload)).toMatchObject({ event: "specific/#done" }); + expect(done.status).toEqual(EventProcessingStatus.Open); expect(loggerMock.callsLengths().error).toEqual(0); }); }); @@ -2686,15 +2698,20 @@ describe("event-queue outbox", () => { await service.send("general", {}); await commitAndOpenNew(); await processEventQueue(tx.context, "CAP_OUTBOX", service.name); - const [done, next] = await testHelper.selectEventQueueAndReturn(tx, { - expectedLength: 2, - additionalColumns: ["payload"], - }); - expect(JSON.parse(done.payload)).toMatchObject({ event: "general" }); - expect(done.status).toEqual(EventProcessingStatus.Done); + const [done, succeeded, finished] = ( + await testHelper.selectEventQueueAndReturn(tx, { + expectedLength: 3, + additionalColumns: ["payload"], + }) + ).sort((a, b) => JSON.parse(a.payload).event.localeCompare(JSON.parse(b.payload).event)); + expect(JSON.parse(finished.payload)).toMatchObject({ event: "general" }); + expect(finished.status).toEqual(EventProcessingStatus.Done); - expect(JSON.parse(next.payload)).toMatchObject({ event: "#succeeded" }); - expect(next.status).toEqual(EventProcessingStatus.Done); + expect(JSON.parse(succeeded.payload)).toMatchObject({ event: "#succeeded" }); + expect(succeeded.status).toEqual(EventProcessingStatus.Done); + + expect(JSON.parse(done.payload)).toMatchObject({ event: "#done" }); + expect(done.status).toEqual(EventProcessingStatus.Done); expect(loggerMock.callsLengths().error).toEqual(0); }); @@ -2703,15 +2720,20 @@ describe("event-queue outbox", () => { await service.send("general", { status: EventProcessingStatus.Error }); await commitAndOpenNew(); await processEventQueue(tx.context, "CAP_OUTBOX", service.name); - const [done, next] = await testHelper.selectEventQueueAndReturn(tx, { - expectedLength: 2, - additionalColumns: ["payload", "lastAttemptTimestamp"], - }); - expect(JSON.parse(done.payload)).toMatchObject({ event: "general" }); - expect(done.status).toEqual(EventProcessingStatus.Error); + const [done, failed, finished] = ( + await testHelper.selectEventQueueAndReturn(tx, { + expectedLength: 3, + additionalColumns: ["payload"], + }) + ).sort((a, b) => JSON.parse(a.payload).event.localeCompare(JSON.parse(b.payload).event)); + expect(JSON.parse(finished.payload)).toMatchObject({ event: "general" }); + expect(finished.status).toEqual(EventProcessingStatus.Error); - expect(JSON.parse(next.payload)).toMatchObject({ event: "#failed" }); - expect(next.status).toEqual(EventProcessingStatus.Done); + expect(JSON.parse(failed.payload)).toMatchObject({ event: "#failed" }); + expect(failed.status).toEqual(EventProcessingStatus.Done); + + expect(JSON.parse(done.payload)).toMatchObject({ event: "#done" }); + expect(done.status).toEqual(EventProcessingStatus.Error); expect(loggerMock.callsLengths().error).toEqual(0); }); }); @@ -2757,7 +2779,7 @@ describe("event-queue outbox", () => { await processEventQueue(tx.context, "CAP_OUTBOX", service.name); const events = await testHelper.selectEventQueueAndReturn(tx, { - expectedLength: 2, + expectedLength: 3, additionalColumns: ["payload", "ID"], }); const sagaEntry = events.find((e) => JSON.parse(e.payload).event === "saga"); @@ -2825,6 +2847,141 @@ describe("event-queue outbox", () => { expect(loggerMock.callsLengths().error).toEqual(1); }); }); + + describe("done handler", () => { + beforeEach(() => { + Object.keys(SagaService.capturedTriggerEvents).forEach( + (key) => delete SagaService.capturedTriggerEvents[key] + ); + }); + + it("fires alongside #succeeded when event is green", async () => { + const service = await cds.connect.to("Saga"); + await service.send("saga", {}); + await commitAndOpenNew(); + await processEventQueue(tx.context, "CAP_OUTBOX", service.name); + const [orig, next1, next2] = await testHelper.selectEventQueueAndReturn(tx, { + expectedLength: 3, + additionalColumns: ["payload"], + }); + expect(JSON.parse(orig.payload)).toMatchObject({ event: "saga" }); + expect(orig.status).toEqual(EventProcessingStatus.Done); + + const payloads = [next1, next2].map((e) => JSON.parse(e.payload).event); + expect(payloads).toContain("saga/#succeeded"); + expect(payloads).toContain("saga/#done"); + expect(loggerMock.callsLengths().error).toEqual(0); + }); + + it("fires alongside #failed when event is red", async () => { + const service = await cds.connect.to("Saga"); + await service.send("saga", { + status: EventProcessingStatus.Error, + nextData: { status: EventProcessingStatus.Done }, + }); + await commitAndOpenNew(); + await processEventQueue(tx.context, "CAP_OUTBOX", service.name); + const [orig, next1, next2] = await testHelper.selectEventQueueAndReturn(tx, { + expectedLength: 3, + additionalColumns: ["payload"], + }); + expect(JSON.parse(orig.payload)).toMatchObject({ event: "saga" }); + expect(orig.status).toEqual(EventProcessingStatus.Error); + + const payloads = [next1, next2].map((e) => JSON.parse(e.payload).event); + expect(payloads).toContain("saga/#failed"); + expect(payloads).toContain("saga/#done"); + expect(loggerMock.callsLengths().error).toEqual(0); + }); + + it("receives triggerEvent with triggerEventResult on success", async () => { + const service = await cds.connect.to("Saga"); + await service.send("saga", {}); + await commitAndOpenNew(); + await processEventQueue(tx.context, "CAP_OUTBOX", service.name); + + const triggerEvent = SagaService.capturedTriggerEvents["saga/#done"]; + expect(triggerEvent).toBeDefined(); + expect(triggerEvent.triggerEventResult).toEqual({ status: EventProcessingStatus.Done }); + expect(triggerEvent.ID).toEqual(expect.any(String)); + expect(loggerMock.callsLengths().error).toEqual(0); + }); + + it("receives triggerEvent with triggerEventResult and error on failure", async () => { + const service = await cds.connect.to("Saga"); + await service.send("saga", { + status: EventProcessingStatus.Error, + nextData: { status: EventProcessingStatus.Done }, + }); + await commitAndOpenNew(); + await processEventQueue(tx.context, "CAP_OUTBOX", service.name); + + const triggerEvent = SagaService.capturedTriggerEvents["saga/#done"]; + expect(triggerEvent).toBeDefined(); + expect(triggerEvent.triggerEventResult).toMatchObject({ status: EventProcessingStatus.Error }); + expect(triggerEvent.ID).toEqual(expect.any(String)); + expect(loggerMock.callsLengths().error).toEqual(0); + }); + + it("receives req.data.error when parent threw an exception", async () => { + const service = await cds.connect.to("Saga"); + await service.send("saga", { throw: "simulated error", nextData: { status: EventProcessingStatus.Done } }); + await commitAndOpenNew(); + await processEventQueue(tx.context, "CAP_OUTBOX", service.name); + + const events = await testHelper.selectEventQueueAndReturn(tx, { + expectedLength: 3, + additionalColumns: ["payload"], + }); + const doneEvent = events.find((e) => JSON.parse(e.payload).event === "saga/#done"); + expect(JSON.parse(doneEvent.payload)).toMatchObject({ + event: "saga/#done", + data: { error: expect.stringContaining("simulated error") }, + }); + expect(loggerMock.callsLengths().error).toEqual(1); + }); + + it("generic #done handler fires for events without a specific handler", async () => { + const service = await cds.connect.to("Saga"); + await service.send("general", {}); + await commitAndOpenNew(); + await processEventQueue(tx.context, "CAP_OUTBOX", service.name); + + const triggerEvent = SagaService.capturedTriggerEvents["#done"]; + expect(triggerEvent).toBeDefined(); + expect(triggerEvent.triggerEventResult).toEqual({ status: EventProcessingStatus.Done }); + expect(loggerMock.callsLengths().error).toEqual(0); + }); + + it("does not trigger further successors (terminal)", async () => { + const service = await cds.connect.to("Saga"); + await service.send("saga", {}); + await commitAndOpenNew(); + await processEventQueue(tx.context, "CAP_OUTBOX", service.name); + + await testHelper.selectEventQueueAndExpectDone(tx, { + expectedLength: 3, + }); + expect(loggerMock.callsLengths().error).toEqual(0); + }); + + it("works with insertEventsBeforeCommit=false", async () => { + config.insertEventsBeforeCommit = false; + const service = await cds.connect.to("Saga"); + await service.send("saga", {}); + await commitAndOpenNew(); + await processEventQueue(tx.context, "CAP_OUTBOX", service.name); + const events = await testHelper.selectEventQueueAndReturn(tx, { + expectedLength: 3, + additionalColumns: ["payload"], + }); + const payloads = events.map((e) => JSON.parse(e.payload).event); + expect(payloads).toContain("saga/#succeeded"); + expect(payloads).toContain("saga/#done"); + expect(loggerMock.callsLengths().error).toEqual(0); + config.insertEventsBeforeCommit = true; + }); + }); }); describe("retry open and failed after", () => { @@ -2867,8 +3024,11 @@ describe("event-queue outbox", () => { it("SELECT", async () => { const service = await cds.connect.to("NotificationService"); const srvQueued = cds.queued(service); - await srvQueued.read("Event"); - await commitAndOpenNew(); + await cds.tx({}, async (tx) => { + await srvQueued.read(service.entities().Event); + const test = await service.read(service.entities().Event); + await commitAndOpenNew(); + }); const [event] = await testHelper.selectEventQueueAndReturn(tx, { expectedLength: 1, additionalColumns: ["payload"], From 4b59070b1b9046bf577ab428f2f8e440c96593f7 Mon Sep 17 00:00:00 2001 From: Max Gruenfelder Date: Fri, 3 Apr 2026 10:31:49 +0200 Subject: [PATCH 3/4] linter --- docs/use-as-cap-outbox/index.md | 36 ++++++++++++++++++--------------- test/eventQueueOutbox.test.js | 5 +++-- 2 files changed, 23 insertions(+), 18 deletions(-) diff --git a/docs/use-as-cap-outbox/index.md b/docs/use-as-cap-outbox/index.md index c43b7c3a..19bbcf9c 100644 --- a/docs/use-as-cap-outbox/index.md +++ b/docs/use-as-cap-outbox/index.md @@ -448,8 +448,12 @@ class MyService extends cds.Service { }); // Generic fallback — runs for any event without a dedicated handler - this.on("#succeeded", async (req) => { /* ... */ }); - this.on("#failed", async (req) => { /* ... */ }); + this.on("#succeeded", async (req) => { + /* ... */ + }); + this.on("#failed", async (req) => { + /* ... */ + }); } } ``` @@ -463,7 +467,7 @@ this.on("orderCreated", async (req) => { const orderId = await createOrder(req.data); return { status: EventProcessingStatus.Done, - nextData: { orderId }, // available as req.data.orderId in the successor + nextData: { orderId }, // available as req.data.orderId in the successor }; }); ``` @@ -473,15 +477,15 @@ this.on("orderCreated", async (req) => { When a successor handler is invoked, `req.eventQueue.triggerEvent` is populated with context from the parent event. This gives the successor full visibility into what happened in the previous step. -| Field | Type | Description | -| -------------------- | -------- | ------------------------------------------------------------------------------------------- | -| `triggerEventResult` | any | The raw return value of the parent handler (e.g. `{ status: 2, nextData: {...} }`) | -| `ID` | string | UUID of the parent queue entry | -| `status` | number | Status of the parent queue entry at processing time | -| `payload` | any | Payload of the parent queue entry | -| `referenceEntity` | string | Reference entity of the parent event (if set) | -| `referenceEntityKey` | string | Reference entity key of the parent event (if set) | -| `lastAttempTimestamp`| string | Timestamp of the last processing attempt of the parent event | +| Field | Type | Description | +| --------------------- | ------ | ---------------------------------------------------------------------------------- | +| `triggerEventResult` | any | The raw return value of the parent handler (e.g. `{ status: 2, nextData: {...} }`) | +| `ID` | string | UUID of the parent queue entry | +| `status` | number | Status of the parent queue entry at processing time | +| `payload` | any | Payload of the parent queue entry | +| `referenceEntity` | string | Reference entity of the parent event (if set) | +| `referenceEntityKey` | string | Reference entity key of the parent event (if set) | +| `lastAttempTimestamp` | string | Timestamp of the last processing attempt of the parent event | `req.eventQueue.triggerEvent` is set in both **`#succeeded`** and **`#failed`** handlers, but only when the event was processed as a single entry (no clustering). When the parent handler **threw an exception**, `triggerEventResult` @@ -520,10 +524,10 @@ one is registered. This prevents infinite failure chains. ### Service-Specific vs. Generic Handlers -| Pattern | Applies to | -| -------------------- | ----------------------------- | -| `/#succeeded` | Only `` | -| `/#failed` | Only `` | +| Pattern | Applies to | +| -------------------- | ------------------------------------- | +| `/#succeeded` | Only `` | +| `/#failed` | Only `` | | `#succeeded` | All events without a specific handler | | `#failed` | All events without a specific handler | diff --git a/test/eventQueueOutbox.test.js b/test/eventQueueOutbox.test.js index 7dab816e..655dd534 100644 --- a/test/eventQueueOutbox.test.js +++ b/test/eventQueueOutbox.test.js @@ -3022,11 +3022,12 @@ describe("event-queue outbox", () => { describe("outbox queries", () => { it("SELECT", async () => { + // FIXME: read still returns undefined const service = await cds.connect.to("NotificationService"); const srvQueued = cds.queued(service); - await cds.tx({}, async (tx) => { + await cds.tx({}, async () => { await srvQueued.read(service.entities().Event); - const test = await service.read(service.entities().Event); + await service.read(service.entities().Event); await commitAndOpenNew(); }); const [event] = await testHelper.selectEventQueueAndReturn(tx, { From 29100dc2650164f9e1b181aecd8547c9b76a002c Mon Sep 17 00:00:00 2001 From: Max Gruenfelder Date: Fri, 3 Apr 2026 10:51:03 +0200 Subject: [PATCH 4/4] fix and add more tests --- src/config.js | 3 +- test/eventQueueOutbox.test.js | 141 +++++++++++++++++++++++++++++++++- 2 files changed, 141 insertions(+), 3 deletions(-) diff --git a/src/config.js b/src/config.js index 3e0f6404..d11d033b 100644 --- a/src/config.js +++ b/src/config.js @@ -389,8 +389,7 @@ class Config { ); result.adHoc[key] = specificEventConfig; for (const sagaSuffix of [SAGA_SUCCESS, SAGA_DONE, SAGA_FAILED]) { - const sagaKey = [fnName, sagaSuffix].join("/"); - if (config.events[sagaKey]) { + if (config.events[sagaSuffix]) { const [adHocKey, sagaSpecificEventConfig] = this.addCAPOutboxEventSpecificAction( srvConfig, name, diff --git a/test/eventQueueOutbox.test.js b/test/eventQueueOutbox.test.js index 655dd534..da3743e6 100644 --- a/test/eventQueueOutbox.test.js +++ b/test/eventQueueOutbox.test.js @@ -178,6 +178,14 @@ cds.env.requires.Saga = { }, }; +cds.env.requires.SagaSpecificConfig = { + impl: path.join(basePath, "srv/service/saga-service.js"), + queued: { + kind: "persistent-queue", + events: { "#succeeded": { propagateHeaders: ["dummy"] } }, + }, +}; + cds.env.requires["sapafcsdk.scheduling.ProviderService"] = { impl: path.join(basePath, "srv/service/standard-service.js"), outbox: { @@ -2619,7 +2627,7 @@ describe("event-queue outbox", () => { }); }); - describe("specific event configuration", () => { + describe("specific callback handler (succeeded|done|failed)", () => { it("succeeded", async () => { const service = await cds.connect.to("Saga"); await service.send("specific", { status: EventProcessingStatus.Done }); @@ -2648,6 +2656,34 @@ describe("event-queue outbox", () => { expect(loggerMock.callsLengths().error).toEqual(0); }); + it("done", async () => { + const service = await cds.connect.to("Saga"); + await service.send("specific", { status: EventProcessingStatus.Done }); + await commitAndOpenNew(); + await processEventQueue(tx.context, "CAP_OUTBOX", `${service.name}.specific`); + let events = await testHelper.selectEventQueueAndReturn(tx, { + expectedLength: 3, + additionalColumns: ["payload"], + }); + let specific = events.find((e) => JSON.parse(e.payload).event === "specific"); + let done = events.find((e) => JSON.parse(e.payload).event === "specific/#done"); + expect(specific.status).toEqual(EventProcessingStatus.Done); + expect(done.status).toEqual(EventProcessingStatus.Open); + + await commitAndOpenNew(); + await processEventQueue(tx.context, "CAP_OUTBOX", `${service.name}.specific/#done`); + + events = await testHelper.selectEventQueueAndReturn(tx, { + expectedLength: 3, + additionalColumns: ["payload"], + }); + specific = events.find((e) => JSON.parse(e.payload).event === "specific"); + done = events.find((e) => JSON.parse(e.payload).event === "specific/#done"); + expect(specific.status).toEqual(EventProcessingStatus.Done); + expect(done.status).toEqual(EventProcessingStatus.Done); + expect(loggerMock.callsLengths().error).toEqual(0); + }); + it("failed", async () => { const service = await cds.connect.to("Saga"); await service.send("specific", { @@ -2692,6 +2728,109 @@ describe("event-queue outbox", () => { }); }); + // TODO: make sure do check for #done but also for specific/#done + // NOTE: think about: we registered a config for #succeeded however the service has a specific/#succeeded handler. So the config is not used + describe.skip("specific event config (succeeded|done|failed)", () => { + it("succeeded", async () => { + const service = await cds.connect.to("SagaSpecificConfig"); + await service.send("specific", { status: EventProcessingStatus.Done }, { dummy: "dummyTest" }); + await commitAndOpenNew(); + await processEventQueue(tx.context, "CAP_OUTBOX", `${service.name}`); + let events = await testHelper.selectEventQueueAndReturn(tx, { + expectedLength: 3, + additionalColumns: ["payload"], + }); + let specific = events.find((e) => JSON.parse(e.payload).event === "specific"); + let succeeded = events.find((e) => JSON.parse(e.payload).event === "specific/#succeeded"); + expect(specific.status).toEqual(EventProcessingStatus.Done); + expect(succeeded.status).toEqual(EventProcessingStatus.Done); + + await commitAndOpenNew(); + await processEventQueue(tx.context, "CAP_OUTBOX", `${service.name}/#succeeded`); + + events = await testHelper.selectEventQueueAndReturn(tx, { + expectedLength: 3, + additionalColumns: ["payload"], + }); + specific = events.find((e) => JSON.parse(e.payload).event === "specific"); + succeeded = events.find((e) => JSON.parse(e.payload).event === "specific/#succeeded"); + expect(specific.status).toEqual(EventProcessingStatus.Done); + expect(succeeded.status).toEqual(EventProcessingStatus.Done); + expect(loggerMock.callsLengths().error).toEqual(0); + }); + + it("done", async () => { + const service = await cds.connect.to("SagaSpecificConfig"); + await service.send("specific", { status: EventProcessingStatus.Done }); + await commitAndOpenNew(); + await processEventQueue(tx.context, "CAP_OUTBOX", `${service.name}.specific`); + let events = await testHelper.selectEventQueueAndReturn(tx, { + expectedLength: 3, + additionalColumns: ["payload"], + }); + let specific = events.find((e) => JSON.parse(e.payload).event === "specific"); + let done = events.find((e) => JSON.parse(e.payload).event === "specific/#done"); + expect(specific.status).toEqual(EventProcessingStatus.Done); + expect(done.status).toEqual(EventProcessingStatus.Open); + + await commitAndOpenNew(); + await processEventQueue(tx.context, "CAP_OUTBOX", `${service.name}.specific/#done`); + + events = await testHelper.selectEventQueueAndReturn(tx, { + expectedLength: 3, + additionalColumns: ["payload"], + }); + specific = events.find((e) => JSON.parse(e.payload).event === "specific"); + done = events.find((e) => JSON.parse(e.payload).event === "specific/#done"); + expect(specific.status).toEqual(EventProcessingStatus.Done); + expect(done.status).toEqual(EventProcessingStatus.Done); + expect(loggerMock.callsLengths().error).toEqual(0); + }); + + it("failed", async () => { + const service = await cds.connect.to("SagaSpecificConfig"); + await service.send("specific", { + status: EventProcessingStatus.Error, + nextData: { status: EventProcessingStatus.Done }, + }); + await commitAndOpenNew(); + await processEventQueue(tx.context, "CAP_OUTBOX", `${service.name}.specific`); + let [finished, done, failed] = ( + await testHelper.selectEventQueueAndReturn(tx, { + expectedLength: 3, + additionalColumns: ["payload"], + }) + ).sort((a, b) => JSON.parse(a.payload).event.localeCompare(JSON.parse(b.payload).event)); + + expect(finished.status).toEqual(EventProcessingStatus.Error); + + expect(JSON.parse(failed.payload)).toMatchObject({ event: "specific/#failed" }); + expect(failed.status).toEqual(EventProcessingStatus.Open); + + expect(JSON.parse(done.payload)).toMatchObject({ event: "specific/#done" }); + expect(done.status).toEqual(EventProcessingStatus.Open); + + await commitAndOpenNew(); + await processEventQueue(tx.context, "CAP_OUTBOX", `${service.name}.specific/#failed`); + + [finished, done, failed] = ( + await testHelper.selectEventQueueAndReturn(tx, { + expectedLength: 3, + additionalColumns: ["payload"], + }) + ).sort((a, b) => JSON.parse(a.payload).event.localeCompare(JSON.parse(b.payload).event)); + expect(JSON.parse(finished.payload)).toMatchObject({ event: "specific" }); + expect(finished.status).toEqual(EventProcessingStatus.Error); + + expect(JSON.parse(failed.payload)).toMatchObject({ event: "specific/#failed" }); + expect(failed.status).toEqual(EventProcessingStatus.Done); + + expect(JSON.parse(done.payload)).toMatchObject({ event: "specific/#done" }); + expect(done.status).toEqual(EventProcessingStatus.Open); + expect(loggerMock.callsLengths().error).toEqual(0); + }); + }); + describe("general handlers", () => { it("if succeeded handler exists and event is green, trigger next event", async () => { const service = await cds.connect.to("Saga");