Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions docs/use-as-cap-outbox/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -409,3 +409,126 @@ this.on("myBatchEvent", (req) => {
}));
});
```

## Saga Pattern

The event-queue supports a saga-style pattern for chaining event handlers. When a handler completes, the
event-queue automatically triggers a designated **successor event** — either a success or a failure follow-up —
allowing you to model multi-step asynchronous workflows with clear separation of concerns.

### How It Works

Register successor handlers with the special `#succeeded` or `#failed` suffix:

- **`<event>/#succeeded`** — triggered when the handler for `<event>` returns `EventProcessingStatus.Done`
- **`<event>/#failed`** — triggered when the handler returns `EventProcessingStatus.Error` or throws

You can also register **generic** successor handlers (`#succeeded` / `#failed`) that apply to every event in the
service that does not have a dedicated successor.

```javascript
class MyService extends cds.Service {
async init() {
await super.init();

// Primary handler
this.on("orderCreated", async (req) => {
// ... business logic
return EventProcessingStatus.Done;
});

// Runs on success — event-specific
this.on("orderCreated/#succeeded", async (req) => {
// req.eventQueue.triggerEvent is available here (see below)
});

// Runs on failure — event-specific
this.on("orderCreated/#failed", async (req) => {
// req.data.error contains the serialised error message
});

// Generic fallback — runs for any event without a dedicated handler
this.on("#succeeded", async (req) => {
/* ... */
});
this.on("#failed", async (req) => {
/* ... */
});
}
}
```

### Passing Data to the Successor

Return a `nextData` property from the primary handler to forward arbitrary data to the successor's `req.data`:

```javascript
this.on("orderCreated", async (req) => {
const orderId = await createOrder(req.data);
return {
status: EventProcessingStatus.Done,
nextData: { orderId }, // available as req.data.orderId in the successor
};
});
```

### Accessing the Trigger Event Context (`req.eventQueue.triggerEvent`)

When a successor handler is invoked, `req.eventQueue.triggerEvent` is populated with context from the parent event.
This gives the successor full visibility into what happened in the previous step.

| Field | Type | Description |
| --------------------- | ------ | ---------------------------------------------------------------------------------- |
| `triggerEventResult` | any | The raw return value of the parent handler (e.g. `{ status: 2, nextData: {...} }`) |
| `ID` | string | UUID of the parent queue entry |
| `status` | number | Status of the parent queue entry at processing time |
| `payload` | any | Payload of the parent queue entry |
| `referenceEntity` | string | Reference entity of the parent event (if set) |
| `referenceEntityKey` | string | Reference entity key of the parent event (if set) |
| `lastAttempTimestamp` | string | Timestamp of the last processing attempt of the parent event |

`req.eventQueue.triggerEvent` is set in both **`#succeeded`** and **`#failed`** handlers, but only when the event
was processed as a single entry (no clustering). When the parent handler **threw an exception**, `triggerEventResult`
will be `undefined` — the queue entry fields (`ID`, `status`, `payload`, etc.) are still present.

```javascript
this.on("orderCreated/#succeeded", async (req) => {
const { triggerEventResult, ID } = req.eventQueue.triggerEvent;

// triggerEventResult is exactly what the parent handler returned
console.log(triggerEventResult);
// → { status: 2, nextData: { orderId: "..." } }

// ID is the UUID of the parent queue entry
console.log(ID); // → "3f2e1a..."
});
```

### Failure Handling and Error Propagation

When a primary handler throws or returns `EventProcessingStatus.Error`, the `#failed` successor receives the
serialised error message in `req.data.error`:

```javascript
this.on("orderCreated/#failed", async (req) => {
console.log(req.data.error); // → "Error: Payment gateway timeout"
// compensate, notify, etc.
return EventProcessingStatus.Done;
});
```

### Stopping the Chain

A `#failed` handler is a terminal step — the event-queue will **not** trigger another successor after it, even if
one is registered. This prevents infinite failure chains.

### Service-Specific vs. Generic Handlers

| Pattern | Applies to |
| -------------------- | ------------------------------------- |
| `<event>/#succeeded` | Only `<event>` |
| `<event>/#failed` | Only `<event>` |
| `#succeeded` | All events without a specific handler |
| `#failed` | All events without a specific handler |

Event-specific handlers take priority over generic ones.
30 changes: 16 additions & 14 deletions src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const UTC_DEFAULT = false;
const USE_CRON_TZ_DEFAULT = true;
const SAGA_SUCCESS = "#succeeded";
const SAGA_FAILED = "#failed";
const SAGA_DONE = "#done";

const BASE_TABLES = {
EVENT: "sap.eventqueue.Event",
Expand Down Expand Up @@ -387,19 +388,20 @@ class Config {
result.adHoc
);
result.adHoc[key] = specificEventConfig;
const sagaSuccessKey = [fnName, SAGA_SUCCESS].join("/");
if (config.events[sagaSuccessKey]) {
const [sagaKey, sagaSpecificEventConfig] = this.addCAPOutboxEventSpecificAction(
srvConfig,
name,
fnName,
result.adHoc
);
result.adHoc[sagaKey] = sagaSpecificEventConfig;
} else {
const sagaConfig = { ...specificEventConfig };
sagaConfig.subType = [sagaConfig.subType, SAGA_SUCCESS].join("/");
result.adHoc[[key, SAGA_SUCCESS].join("/")] = sagaConfig;
for (const sagaSuffix of [SAGA_SUCCESS, SAGA_DONE, SAGA_FAILED]) {
if (config.events[sagaSuffix]) {
const [adHocKey, sagaSpecificEventConfig] = this.addCAPOutboxEventSpecificAction(
srvConfig,
name,
fnName,
result.adHoc
);
result.adHoc[adHocKey] = sagaSpecificEventConfig;
} else {
const sagaConfig = { ...specificEventConfig };
sagaConfig.subType = [sagaConfig.subType, sagaSuffix].join("/");
result.adHoc[[key, sagaSuffix].join("/")] = sagaConfig;
}
}
}
}
Expand Down Expand Up @@ -434,7 +436,7 @@ class Config {
}

const [withoutSaga, sagaSuffix] = action.split("/");
if ([SAGA_FAILED, SAGA_SUCCESS].includes(sagaSuffix)) {
if ([SAGA_FAILED, SAGA_SUCCESS, SAGA_DONE].includes(sagaSuffix)) {
if (config?.events?.[withoutSaga]) {
return this.#mixCAPPropertyNamesWithEventQueueNames(config.events[withoutSaga]);
}
Expand Down
79 changes: 68 additions & 11 deletions src/outbox/EventQueueGenericOutboxHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,18 @@ const EVENT_QUEUE_ACTIONS = {
CHECK_AND_ADJUST: "eventQueueCheckAndAdjustPayload",
SAGA_SUCCESS: "#succeeded",
SAGA_FAILED: "#failed",
SAGA_DONE: "#done",
};

const PROPAGATE_EVENT_QUEUE_ENTRIES = [
"ID",
"lastAttempTimestamp",
"payload",
"referenceEntity",
"referenceEntityKey",
"status",
];

class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
constructor(context, eventType, eventSubType, config) {
super(context, eventType, eventSubType, config);
Expand Down Expand Up @@ -312,7 +322,7 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
return genericHandler ?? null;
}

if (event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_SUCCESS)) {
if (event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_SUCCESS) || event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_DONE)) {
[event] = event.split("/");
}

Expand All @@ -334,11 +344,23 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
#buildDispatchData(payload, { key, queueEntries } = {}) {
const { useEventQueueUser } = this.eventConfig;
const userId = useEventQueueUser ? config.userId : payload.contextUser;
let triggerEvent;

if (payload.data?.triggerEvent) {
try {
triggerEvent = JSON.parse(payload.data.triggerEvent);
} catch (err) {
this.logger.error("[saga] error parsing triggering event data", err);
} finally {
delete payload.data.triggerEvent;
}
}

const req = payload._fromSend ? new cds.Request(payload) : new cds.Event(payload);
const invocationFn = payload._fromSend ? "send" : "emit";
delete req._fromSend;
delete req.contextUser;
req.eventQueue = { processor: this, key, queueEntries, payload };
req.eventQueue = { processor: this, key, queueEntries, payload, triggerEvent };

if (this.eventConfig.propagateContextProperties?.length && this.transactionMode === "isolated" && cds.context) {
for (const prop of this.eventConfig.propagateContextProperties) {
Expand All @@ -362,11 +384,11 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
}

async processEvent(processContext, key, queueEntries, payload) {
let statusTuple;
let statusTuple, result;
const { userId, invocationFn, req } = this.#buildDispatchData(payload, { key, queueEntries });
try {
await this.#setContextUser(processContext, userId, req);
const result = await this.__srvUnboxed.tx(processContext)[invocationFn](req);
result = await this.__srvUnboxed.tx(processContext)[invocationFn](req);
statusTuple = this.#determineResultStatus(result, queueEntries);
} catch (err) {
this.logger.error("error processing outboxed service call", err, {
Expand All @@ -381,19 +403,20 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
]);
}

await this.#publishFollowupEvents(processContext, req, statusTuple);
await this.#publishFollowupEvents(processContext, req, statusTuple, result);
return statusTuple;
}

async #publishFollowupEvents(processContext, req, statusTuple) {
async #publishFollowupEvents(processContext, req, statusTuple, triggerEventResult) {
const succeeded = this.#checkHandlerExists({ event: req.event, saga: EVENT_QUEUE_ACTIONS.SAGA_SUCCESS });
const failed = this.#checkHandlerExists({ event: req.event, saga: EVENT_QUEUE_ACTIONS.SAGA_FAILED });
const done = this.#checkHandlerExists({ event: req.event, saga: EVENT_QUEUE_ACTIONS.SAGA_DONE });

if (!succeeded && !failed) {
if (!succeeded && !failed && !done) {
return;
}

if (req.event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_FAILED)) {
if (req.event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_FAILED) || req.event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_DONE)) {
return;
}

Expand All @@ -405,19 +428,49 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
tx._eventQueue.events = [];
}

const queued = cds.queued(this.__srv);
for (const [, result] of statusTuple) {
const data = result.nextData ?? req.data;
if (
succeeded &&
result.status === EventProcessingStatus.Done &&
!req.event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_SUCCESS)
) {
await this.__srv.tx(processContext).send(succeeded, data);
if (statusTuple.length === 1 && req.eventQueue.queueEntries.length === 1) {
const triggerEventPropagate = { triggerEventResult };
const [triggerEvent] = req.eventQueue.queueEntries;
for (const propertyName of PROPAGATE_EVENT_QUEUE_ENTRIES) {
triggerEventPropagate[propertyName] = triggerEvent[propertyName];
}
data.triggerEvent = JSON.stringify(triggerEventPropagate);
}

await queued.tx(processContext).send(succeeded, data);
}

if (failed && result.status === EventProcessingStatus.Error) {
result.error && (data.error = this._error2String(result.error));
await this.__srv.tx(processContext).send(failed, data);
if (statusTuple.length === 1 && req.eventQueue.queueEntries.length === 1) {
const triggerEventPropagate = { triggerEventResult };
const [triggerEvent] = req.eventQueue.queueEntries;
for (const propertyName of PROPAGATE_EVENT_QUEUE_ENTRIES) {
triggerEventPropagate[propertyName] = triggerEvent[propertyName];
}
data.triggerEvent = JSON.stringify(triggerEventPropagate);
}
await queued.tx(processContext).send(failed, data);
}

if (done && !req.event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_SUCCESS)) {
if (statusTuple.length === 1 && req.eventQueue.queueEntries.length === 1) {
const triggerEventPropagate = { triggerEventResult };
const [triggerEvent] = req.eventQueue.queueEntries;
for (const propertyName of PROPAGATE_EVENT_QUEUE_ENTRIES) {
triggerEventPropagate[propertyName] = triggerEvent[propertyName];
}
data.triggerEvent = JSON.stringify(triggerEventPropagate);
}
await queued.tx(processContext).send(done, data);
}

delete result.nextData;
Expand All @@ -426,7 +479,11 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
if (config.insertEventsBeforeCommit) {
this.nextSagaEvents = tx._eventQueue?.events;
} else {
this.nextSagaEvents = tx._eventQueue?.events.filter((event) => JSON.parse(event.payload).event === failed);
const hasError = statusTuple.some(([, result]) => result.status === EventProcessingStatus.Error);
this.nextSagaEvents = tx._eventQueue?.events.filter((event) => {
const eventName = JSON.parse(event.payload).event;
return eventName === failed || (hasError && eventName === done);
});
}

if (tx._eventQueue) {
Expand Down
Loading
Loading