diff --git a/.gitignore b/.gitignore index a1132f57..1c953d34 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ **/.project **/.settings *.code-workspace +CLAUDE.md # package managers bower_components/ @@ -30,6 +31,7 @@ data/afc.sqlite-wal data/test.sqlite coverage Claude.md +.claude/ # temp temp/ diff --git a/CHANGELOG.md b/CHANGELOG.md index 91e7b9b3..20f3839c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - [CAP Queue] Add support for defining successor and failed events of event handlers. See documentation for how to use it. - [CAP Queue] Allow to propagate cds.context properties (e.g. features). This can be configured per event (`cds.env.requires[].queued.propagateContextProperties = ["features"]`) +- [Telemetry] Add opt-in event queue metrics (`collectEventQueueMetrics`). When enabled together with Redis and an OpenTelemetry MeterProvider, the module publishes `cap.event_queue.jobs.pending`, `cap.event_queue.jobs.in_progress`, and `cap.event_queue.stats.refresh_age` as Observable Gauges, broken down by namespace. See [documentation](https://cap-js-community.github.io/event-queue/telemetry/). ### Fixed diff --git a/docs/Gemfile b/docs/Gemfile index d2dd01aa..80a79ba6 100644 --- a/docs/Gemfile +++ b/docs/Gemfile @@ -1,13 +1,14 @@ source "https://rubygems.org" -# https://pages.github.com/versions/ - # https://github.com/ruby/ruby -ruby "~> 3.4.7" +ruby ">= 3.4.7" + +# https://jekyllrb.com/docs/installation/ +gem "jekyll", "~> 4.3" -# NOTE: this fixes the relevant jekyll version -# https://github.com/github/pages-gem/releases -gem "github-pages", "~> 232", group: :jekyll_plugins +# Ruby 4.0 removed these from default gems +gem "logger" +gem "csv" # https://github.com/just-the-docs/just-the-docs/releases -gem "just-the-docs", "~> 0.10.1" \ No newline at end of file +gem "just-the-docs", "~> 0.10.1" diff --git a/docs/Gemfile.lock b/docs/Gemfile.lock index f2a6542e..a5554ce6 100644 --- a/docs/Gemfile.lock +++ b/docs/Gemfile.lock @@ -1,50 +1,18 @@ GEM remote: https://rubygems.org/ specs: - activesupport (8.1.1) - base64 - bigdecimal - concurrent-ruby (~> 1.0, >= 1.3.1) - connection_pool (>= 2.2.5) - drb - i18n (>= 1.6, < 2) - json - logger (>= 1.4.2) - minitest (>= 5.1) - securerandom (>= 0.3) - tzinfo (~> 2.0, >= 2.0.5) - uri (>= 0.13.1) addressable (2.8.7) public_suffix (>= 2.0.2, < 7.0) base64 (0.3.0) bigdecimal (3.3.1) - coffee-script (2.4.1) - coffee-script-source - execjs - coffee-script-source (1.12.2) colorator (1.1.0) - commonmarker (0.23.12) concurrent-ruby (1.3.5) - connection_pool (2.5.4) csv (3.3.5) - dnsruby (1.73.1) - base64 (>= 0.2) - logger (~> 1.6) - simpleidn (~> 0.2.1) - drb (2.2.3) em-websocket (0.5.3) eventmachine (>= 0.12.9) http_parser.rb (~> 0) - ethon (0.15.0) - ffi (>= 1.15.0) eventmachine (1.2.7) - execjs (2.10.0) - faraday (2.14.0) - faraday-net_http (>= 2.0, < 3.5) - json - logger - faraday-net_http (3.4.2) - net-http (~> 0.5) + ffi (1.17.2) ffi (1.17.2-aarch64-linux-gnu) ffi (1.17.2-aarch64-linux-musl) ffi (1.17.2-arm-linux-gnu) @@ -54,175 +22,57 @@ GEM ffi (1.17.2-x86_64-linux-gnu) ffi (1.17.2-x86_64-linux-musl) forwardable-extended (2.6.0) - gemoji (4.1.0) - github-pages (232) - github-pages-health-check (= 1.18.2) - jekyll (= 3.10.0) - jekyll-avatar (= 0.8.0) - jekyll-coffeescript (= 1.2.2) - jekyll-commonmark-ghpages (= 0.5.1) - jekyll-default-layout (= 0.1.5) - jekyll-feed (= 0.17.0) - jekyll-gist (= 1.5.0) - jekyll-github-metadata (= 2.16.1) - jekyll-include-cache (= 0.2.1) - jekyll-mentions (= 1.6.0) - jekyll-optional-front-matter (= 0.3.2) - jekyll-paginate (= 1.1.0) - jekyll-readme-index (= 0.3.0) - jekyll-redirect-from (= 0.16.0) - jekyll-relative-links (= 0.6.1) - jekyll-remote-theme (= 0.4.3) - jekyll-sass-converter (= 1.5.2) - jekyll-seo-tag (= 2.8.0) - jekyll-sitemap (= 1.4.0) - jekyll-swiss (= 1.0.0) - jekyll-theme-architect (= 0.2.0) - jekyll-theme-cayman (= 0.2.0) - jekyll-theme-dinky (= 0.2.0) - jekyll-theme-hacker (= 0.2.0) - jekyll-theme-leap-day (= 0.2.0) - jekyll-theme-merlot (= 0.2.0) - jekyll-theme-midnight (= 0.2.0) - jekyll-theme-minimal (= 0.2.0) - jekyll-theme-modernist (= 0.2.0) - jekyll-theme-primer (= 0.6.0) - jekyll-theme-slate (= 0.2.0) - jekyll-theme-tactile (= 0.2.0) - jekyll-theme-time-machine (= 0.2.0) - jekyll-titles-from-headings (= 0.5.3) - jemoji (= 0.13.0) - kramdown (= 2.4.0) - kramdown-parser-gfm (= 1.1.0) - liquid (= 4.0.4) - mercenary (~> 0.3) - minima (= 2.5.1) - nokogiri (>= 1.16.2, < 2.0) - rouge (= 3.30.0) - terminal-table (~> 1.4) - webrick (~> 1.8) - github-pages-health-check (1.18.2) - addressable (~> 2.3) - dnsruby (~> 1.60) - octokit (>= 4, < 8) - public_suffix (>= 3.0, < 6.0) - typhoeus (~> 1.3) - html-pipeline (2.14.3) - activesupport (>= 2) - nokogiri (>= 1.4) + google-protobuf (4.34.1) + bigdecimal + rake (~> 13.3) + google-protobuf (4.34.1-aarch64-linux-gnu) + bigdecimal + rake (~> 13.3) + google-protobuf (4.34.1-aarch64-linux-musl) + bigdecimal + rake (~> 13.3) + google-protobuf (4.34.1-arm64-darwin) + bigdecimal + rake (~> 13.3) + google-protobuf (4.34.1-x86_64-darwin) + bigdecimal + rake (~> 13.3) + google-protobuf (4.34.1-x86_64-linux-gnu) + bigdecimal + rake (~> 13.3) + google-protobuf (4.34.1-x86_64-linux-musl) + bigdecimal + rake (~> 13.3) http_parser.rb (0.8.0) i18n (1.14.7) concurrent-ruby (~> 1.0) - jekyll (3.10.0) + jekyll (4.4.1) addressable (~> 2.4) + base64 (~> 0.2) colorator (~> 1.0) csv (~> 3.0) em-websocket (~> 0.5) - i18n (>= 0.7, < 2) - jekyll-sass-converter (~> 1.0) + i18n (~> 1.0) + jekyll-sass-converter (>= 2.0, < 4.0) jekyll-watch (~> 2.0) - kramdown (>= 1.17, < 3) + json (~> 2.6) + kramdown (~> 2.3, >= 2.3.1) + kramdown-parser-gfm (~> 1.0) liquid (~> 4.0) - mercenary (~> 0.3.3) + mercenary (~> 0.3, >= 0.3.6) pathutil (~> 0.9) - rouge (>= 1.7, < 4) + rouge (>= 3.0, < 5.0) safe_yaml (~> 1.0) - webrick (>= 1.0) - jekyll-avatar (0.8.0) - jekyll (>= 3.0, < 5.0) - jekyll-coffeescript (1.2.2) - coffee-script (~> 2.2) - coffee-script-source (~> 1.12) - jekyll-commonmark (1.4.0) - commonmarker (~> 0.22) - jekyll-commonmark-ghpages (0.5.1) - commonmarker (>= 0.23.7, < 1.1.0) - jekyll (>= 3.9, < 4.0) - jekyll-commonmark (~> 1.4.0) - rouge (>= 2.0, < 5.0) - jekyll-default-layout (0.1.5) - jekyll (>= 3.0, < 5.0) - jekyll-feed (0.17.0) - jekyll (>= 3.7, < 5.0) - jekyll-gist (1.5.0) - octokit (~> 4.2) - jekyll-github-metadata (2.16.1) - jekyll (>= 3.4, < 5.0) - octokit (>= 4, < 7, != 4.4.0) + terminal-table (>= 1.8, < 4.0) + webrick (~> 1.7) jekyll-include-cache (0.2.1) jekyll (>= 3.7, < 5.0) - jekyll-mentions (1.6.0) - html-pipeline (~> 2.3) - jekyll (>= 3.7, < 5.0) - jekyll-optional-front-matter (0.3.2) - jekyll (>= 3.0, < 5.0) - jekyll-paginate (1.1.0) - jekyll-readme-index (0.3.0) - jekyll (>= 3.0, < 5.0) - jekyll-redirect-from (0.16.0) - jekyll (>= 3.3, < 5.0) - jekyll-relative-links (0.6.1) - jekyll (>= 3.3, < 5.0) - jekyll-remote-theme (0.4.3) - addressable (~> 2.0) - jekyll (>= 3.5, < 5.0) - jekyll-sass-converter (>= 1.0, <= 3.0.0, != 2.0.0) - rubyzip (>= 1.3.0, < 3.0) - jekyll-sass-converter (1.5.2) - sass (~> 3.4) + jekyll-sass-converter (3.1.0) + sass-embedded (~> 1.75) jekyll-seo-tag (2.8.0) jekyll (>= 3.8, < 5.0) - jekyll-sitemap (1.4.0) - jekyll (>= 3.7, < 5.0) - jekyll-swiss (1.0.0) - jekyll-theme-architect (0.2.0) - jekyll (> 3.5, < 5.0) - jekyll-seo-tag (~> 2.0) - jekyll-theme-cayman (0.2.0) - jekyll (> 3.5, < 5.0) - jekyll-seo-tag (~> 2.0) - jekyll-theme-dinky (0.2.0) - jekyll (> 3.5, < 5.0) - jekyll-seo-tag (~> 2.0) - jekyll-theme-hacker (0.2.0) - jekyll (> 3.5, < 5.0) - jekyll-seo-tag (~> 2.0) - jekyll-theme-leap-day (0.2.0) - jekyll (> 3.5, < 5.0) - jekyll-seo-tag (~> 2.0) - jekyll-theme-merlot (0.2.0) - jekyll (> 3.5, < 5.0) - jekyll-seo-tag (~> 2.0) - jekyll-theme-midnight (0.2.0) - jekyll (> 3.5, < 5.0) - jekyll-seo-tag (~> 2.0) - jekyll-theme-minimal (0.2.0) - jekyll (> 3.5, < 5.0) - jekyll-seo-tag (~> 2.0) - jekyll-theme-modernist (0.2.0) - jekyll (> 3.5, < 5.0) - jekyll-seo-tag (~> 2.0) - jekyll-theme-primer (0.6.0) - jekyll (> 3.5, < 5.0) - jekyll-github-metadata (~> 2.9) - jekyll-seo-tag (~> 2.0) - jekyll-theme-slate (0.2.0) - jekyll (> 3.5, < 5.0) - jekyll-seo-tag (~> 2.0) - jekyll-theme-tactile (0.2.0) - jekyll (> 3.5, < 5.0) - jekyll-seo-tag (~> 2.0) - jekyll-theme-time-machine (0.2.0) - jekyll (> 3.5, < 5.0) - jekyll-seo-tag (~> 2.0) - jekyll-titles-from-headings (0.5.3) - jekyll (>= 3.3, < 5.0) jekyll-watch (2.2.1) listen (~> 3.0) - jemoji (0.13.0) - gemoji (>= 3, < 5) - html-pipeline (~> 2.2) - jekyll (>= 3.0, < 5.0) json (2.16.0) just-the-docs (0.10.1) jekyll (>= 3.8.5) @@ -239,62 +89,35 @@ GEM rb-inotify (~> 0.9, >= 0.9.10) logger (1.7.0) mercenary (0.3.6) - minima (2.5.1) - jekyll (>= 3.5, < 5.0) - jekyll-feed (~> 0.9) - jekyll-seo-tag (~> 2.1) - minitest (5.26.1) - net-http (0.7.0) - uri - nokogiri (1.18.10-aarch64-linux-gnu) - racc (~> 1.4) - nokogiri (1.18.10-aarch64-linux-musl) - racc (~> 1.4) - nokogiri (1.18.10-arm-linux-gnu) - racc (~> 1.4) - nokogiri (1.18.10-arm-linux-musl) - racc (~> 1.4) - nokogiri (1.18.10-arm64-darwin) - racc (~> 1.4) - nokogiri (1.18.10-x86_64-darwin) - racc (~> 1.4) - nokogiri (1.18.10-x86_64-linux-gnu) - racc (~> 1.4) - nokogiri (1.18.10-x86_64-linux-musl) - racc (~> 1.4) - octokit (4.25.1) - faraday (>= 1, < 3) - sawyer (~> 0.9) pathutil (0.16.2) forwardable-extended (~> 2.6) public_suffix (5.1.1) - racc (1.8.1) rake (13.3.1) rb-fsevent (0.11.2) rb-inotify (0.11.1) ffi (~> 1.0) rexml (3.4.4) rouge (3.30.0) - rubyzip (2.4.1) safe_yaml (1.0.5) - sass (3.7.4) - sass-listen (~> 4.0.0) - sass-listen (4.0.0) - rb-fsevent (~> 0.9, >= 0.9.4) - rb-inotify (~> 0.9, >= 0.9.7) - sawyer (0.9.3) - addressable (>= 2.3.5) - faraday (>= 0.17.3, < 3) - securerandom (0.4.1) - simpleidn (0.2.3) + sass-embedded (1.98.0-aarch64-linux-gnu) + google-protobuf (~> 4.31) + sass-embedded (1.98.0-aarch64-linux-musl) + google-protobuf (~> 4.31) + sass-embedded (1.98.0-arm-linux-gnueabihf) + google-protobuf (~> 4.31) + sass-embedded (1.98.0-arm-linux-musleabihf) + google-protobuf (~> 4.31) + sass-embedded (1.98.0-arm64-darwin) + google-protobuf (~> 4.31) + sass-embedded (1.98.0-x86_64-darwin) + google-protobuf (~> 4.31) + sass-embedded (1.98.0-x86_64-linux-gnu) + google-protobuf (~> 4.31) + sass-embedded (1.98.0-x86_64-linux-musl) + google-protobuf (~> 4.31) terminal-table (1.8.0) unicode-display_width (~> 1.1, >= 1.1.1) - typhoeus (1.5.0) - ethon (>= 0.9.0, < 0.16.0) - tzinfo (2.0.6) - concurrent-ruby (~> 1.0) unicode-display_width (1.8.0) - uri (1.1.1) webrick (1.9.1) PLATFORMS @@ -308,8 +131,10 @@ PLATFORMS x86_64-linux-musl DEPENDENCIES - github-pages (~> 232) + csv + jekyll (~> 4.3) just-the-docs (~> 0.10.1) + logger RUBY VERSION ruby 3.4.7p58 diff --git a/docs/_config.yml b/docs/_config.yml index 4c70bba3..ec396446 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -26,7 +26,7 @@ description: >- # Build settings baseurl: /event-queue markdown: kramdown -remote_theme: just-the-docs/just-the-docs@v0.4.0 +theme: just-the-docs # Exclude from processing. # The following items will not be processed, by default. diff --git a/docs/index.md b/docs/index.md index 7e30bf8e..274fc9ad 100644 --- a/docs/index.md +++ b/docs/index.md @@ -32,8 +32,9 @@ processing. job execution. - [Managed transactions](/event-queue/transaction-handling) for reliable event processing. - [Plug and play](setup) integration via `cds-plugin`. -- [Telemetry support and trace propagation](/event-queue/telemetry), including event-queue-specific telemetry data and - tracing information. +- [Telemetry support and trace propagation](/event-queue/telemetry), including distributed tracing, trace context + propagation, and opt-in OpenTelemetry metrics for real-time queue depth monitoring (`cap.event_queue.jobs.pending`, + `cap.event_queue.jobs.in_progress`). - Full support for local testing with SQLite, enabling feature development and testing in a local environment to boost productivity. - Asynchronous processing of resource-intensive tasks to improve UI responsiveness. @@ -70,13 +71,13 @@ processing. manages the load, distributing it across all available app instances.+ - OpenTelemetry and Trace Propagation - The event-queue integrates with OpenTelemetry, providing detailed telemetry data for monitoring and - troubleshooting. - OpenTelemetry captures traces, spans, and metrics that help diagnose performance bottlenecks, detect failures, and - understand the lifecycle of events. + troubleshooting. OpenTelemetry captures traces, spans, and metrics that help diagnose performance bottlenecks, + detect failures, and understand the lifecycle of events. - **Trace Propagation** ensures that tracing context is maintained across distributed services, allowing end-to-end - visibility - into event processing. This is especially beneficial in microservice architectures, where events traverse multiple - services and instances. With trace propagation, developers can track event execution paths, measure processing - times, - and correlate related events across different services, leading to improved observability and debugging - efficiency. + visibility into event processing. This is especially beneficial in microservice architectures, where events + traverse multiple services and instances. With trace propagation, developers can track event execution paths, + measure processing times, and correlate related events across different services, leading to improved observability + and debugging efficiency. + - **Queue Depth Metrics** – When enabled via `collectEventQueueMetrics`, the event-queue exposes real-time + Observable Gauges (`cap.event_queue.jobs.pending`, `cap.event_queue.jobs.in_progress`) per namespace, backed by + Redis and surfaced to any OpenTelemetry-compatible monitoring tool. diff --git a/docs/telemetry/img_2.png b/docs/telemetry/img_2.png new file mode 100644 index 00000000..02ad9171 Binary files /dev/null and b/docs/telemetry/img_2.png differ diff --git a/docs/telemetry/index.md b/docs/telemetry/index.md index 9f2d3090..0ed8b116 100644 --- a/docs/telemetry/index.md +++ b/docs/telemetry/index.md @@ -95,3 +95,68 @@ when a Dynatrace trace context is missing. ## Example Trace ![Example Trace](img_1.png) + +## Event Queue Metrics + +In addition to distributed tracing, the event-queue can expose real-time queue depth metrics via OpenTelemetry. When +enabled, it tracks how many events are **pending** (waiting to be processed) and **in progress** (actively being +processed) per namespace, stored in Redis and exposed as OpenTelemetry Observable Gauges. + +### Metrics Published + +| Metric name | Unit | Description | +| ----------------------------------- | ---- | --------------------------------------------------------- | +| `cap.event_queue.jobs.pending` | 1 | Current number of events waiting to be processed | +| `cap.event_queue.jobs.in_progress` | 1 | Current number of events actively being processed | +| `cap.event_queue.stats.refresh_age` | s | Age of the most recent stats snapshot (staleness monitor) | + +All metrics carry a `queue.namespace` attribute so you can filter by namespace in your monitoring tool. + +### How It Works + +- **On INSERT**: when events are published and the transaction commits, the pending counter is incremented. +- **On UPDATE**: as events transition between statuses (Open → InProgress → Done/Error/Exceeded/Suspended), the + pending and inProgress counters are adjusted accordingly. +- **On each runner cycle**: the runner refreshes the absolute pending count per namespace from the database, so + counters stay accurate even after restarts or direct DB modifications. +- Gauges are polled by the OpenTelemetry metrics SDK on demand; the event-queue additionally refreshes its + internal stats snapshot every 30 seconds as a background task. + +### Enabling Metrics + +Metrics collection requires Redis and is **opt-in**. Enable it via the initialization parameter +`collectEventQueueMetrics`: + +```js +await eventQueue.initialize({ + // ... + collectEventQueueMetrics: true, +}); +``` + +Or via `cds.env` (e.g. in `package.json`): + +```json +{ + "cds": { + "eventQueue": { + "collectEventQueueMetrics": true + } + } +} +``` + +The full set of conditions required for metrics to be active: + +| Condition | Required value | Notes | +| -------------------------- | -------------- | ---------------------------------------------------- | +| `collectEventQueueMetrics` | `true` | Master switch; default `false` | +| `enableTelemetry` | `true` | Default `true`; global telemetry kill-switch | +| Redis enabled | yes | Stats are stored in Redis hashes | +| OpenTelemetry metrics SDK | present | `@opentelemetry/api` with a configured MeterProvider | + +If any condition is not met, `initMetrics()` returns immediately and no gauges are registered. + +### Example Dashboard + +![Event Queue Metrics Dashboard](img_2.png) diff --git a/src/config.js b/src/config.js index 83327bd3..4fa68814 100644 --- a/src/config.js +++ b/src/config.js @@ -107,6 +107,7 @@ class Config { #redisOptions; #insertEventsBeforeCommit; #enableTelemetry; + #collectEventQueueMetrics; #unsubscribeHandlers = []; #unsubscribedTenants = {}; #cronTimezone; @@ -901,6 +902,14 @@ class Config { return this.#enableTelemetry; } + set collectEventQueueMetrics(value) { + this.#collectEventQueueMetrics = value; + } + + get collectEventQueueMetrics() { + return this.#collectEventQueueMetrics; + } + get isMultiTenancy() { return !!cds.requires.multitenancy; } diff --git a/src/dbHandler.js b/src/dbHandler.js index 030db39d..2afe44ef 100644 --- a/src/dbHandler.js +++ b/src/dbHandler.js @@ -4,11 +4,14 @@ const cds = require("@sap/cds"); const redisPub = require("./redis/redisPub"); const config = require("./config"); +const eventQueueStats = require("./shared/eventQueueStats"); +const { EventProcessingStatus } = require("./constants"); const COMPONENT_NAME = "/eventQueue/dbHandler"; const registeredHandlers = { eventQueueDbHandler: false, beforeDbHandler: false, + updateDbHandler: false, }; const registerEventQueueDbHandler = (dbService) => { @@ -26,36 +29,114 @@ const registerEventQueueDbHandler = (dbService) => { req.tx._.eventQueuePublishEvents = req.tx._.eventQueuePublishEvents ?? {}; const eventQueuePublishEvents = req.tx._.eventQueuePublishEvents; const data = Array.isArray(req.query.INSERT.entries) ? req.query.INSERT.entries : [req.query.INSERT.entries]; - const eventCombinations = Object.keys( - data.reduce((result, event) => { - const key = [event.type, event.subType, event.namespace].join("##"); - if ( - !config.hasEventAfterCommitFlag(event.type, event.subType, event.namespace) || - eventQueuePublishEvents[key] - ) { - return result; - } + + req.tx._.eventQueueStatsOpenCount = (req.tx._.eventQueueStatsOpenCount ?? 0) + data.length; + const newCombinations = data.reduce((result, event) => { + const key = [event.type, event.subType, event.namespace].join("##"); + if (config.hasEventAfterCommitFlag(event.type, event.subType, event.namespace) && !eventQueuePublishEvents[key]) { eventQueuePublishEvents[key] = true; - result[key] = true; - return result; - }, {}) - ); + result.push(key); + } + return result; + }, []); - eventCombinations.length && + req.tx._.eventQueueBroadcastCombinations ??= []; + req.tx._.eventQueueBroadcastCombinations.push(...newCombinations); + if (!req.tx._.eventQueueSucceededHandlerRegistered) { + req.tx._.eventQueueSucceededHandlerRegistered = true; req.on("succeeded", () => { - const events = eventCombinations.map((eventCombination) => { - const [type, subType, namespace] = eventCombination.split("##"); - return { type, subType, namespace }; - }); - - redisPub.broadcastEvent(req.tenant, events).catch((err) => { - cds.log(COMPONENT_NAME).error("db handler failure during broadcasting event", err, { - tenant: req.tenant, - events, + if (config.collectEventQueueMetrics && config.redisEnabled && req.tx._.eventQueueStatsOpenCount) { + eventQueueStats + .incrementCounters(req.tenant, eventQueueStats.StatusField.Pending, req.tx._.eventQueueStatsOpenCount) + .catch((err) => { + cds.log(COMPONENT_NAME).error("db handler failure during updating event stats", err, { + tenant: req.tenant, + }); + }); + } + const combinations = req.tx._.eventQueueBroadcastCombinations; + if (combinations.length) { + const events = combinations.map((combination) => { + const [type, subType, namespace] = combination.split("##"); + return { type, subType, namespace }; }); - }); + redisPub.broadcastEvent(req.tenant, events).catch((err) => { + cds.log(COMPONENT_NAME).error("db handler failure during broadcasting event", err, { + tenant: req.tenant, + events, + }); + }); + } }); + } }); + + if (!registeredHandlers.updateDbHandler) { + if (!config.collectEventQueueMetrics || !config.redisEnabled) { + return; + } + registeredHandlers.updateDbHandler = true; + dbService.after("UPDATE", def, (count, req) => { + const newStatus = req.query.UPDATE?.data?.status; + if (newStatus == null) { + return; + } + + req.tx._ = req.tx._ ?? {}; + req.tx._.eventQueueStatsPendingDelta = req.tx._.eventQueueStatsPendingDelta ?? 0; + req.tx._.eventQueueStatsInProgressDelta = req.tx._.eventQueueStatsInProgressDelta ?? 0; + + if (newStatus === EventProcessingStatus.InProgress) { + req.tx._.eventQueueStatsPendingDelta -= count; + req.tx._.eventQueueStatsInProgressDelta += count; + } else if (newStatus === EventProcessingStatus.Error) { + req.tx._.eventQueueStatsInProgressDelta -= count; + req.tx._.eventQueueStatsPendingDelta += count; + } else if ( + newStatus === EventProcessingStatus.Done || + newStatus === EventProcessingStatus.Exceeded || + newStatus === EventProcessingStatus.Suspended + ) { + req.tx._.eventQueueStatsInProgressDelta -= count; + } + + if (!req.tx._.eventQueueUpdateSucceededHandlerRegistered) { + req.tx._.eventQueueUpdateSucceededHandlerRegistered = true; + req.on("succeeded", () => { + if (!config.redisEnabled) { + return; + } + const pendingDelta = req.tx._.eventQueueStatsPendingDelta; + const inProgressDelta = req.tx._.eventQueueStatsInProgressDelta; + const ops = []; + + if (pendingDelta !== 0) { + ops.push( + eventQueueStats.adjustTenantCounter(req.tenant, eventQueueStats.StatusField.Pending, pendingDelta), + eventQueueStats.adjustGlobalCounter(eventQueueStats.StatusField.Pending, pendingDelta) + ); + } + if (inProgressDelta !== 0) { + ops.push( + eventQueueStats.adjustTenantCounter(req.tenant, eventQueueStats.StatusField.InProgress, inProgressDelta), + eventQueueStats.adjustGlobalCounter(eventQueueStats.StatusField.InProgress, inProgressDelta) + ); + } + Promise.allSettled(ops).then((results) => { + for (const result of results) { + if (result.status === "rejected") { + cds + .log(COMPONENT_NAME) + .error("db handler failure during updating event stats on update", result.reason, { + tenant: req.tenant, + }); + } + } + }); + }); + } + }); + } }; module.exports = { diff --git a/src/initialize.js b/src/initialize.js index 392c55e8..6f860c08 100644 --- a/src/initialize.js +++ b/src/initialize.js @@ -17,6 +17,7 @@ const { getAllTenantIds } = require("./shared/cdsHelper"); const { EventProcessingStatus } = require("./constants"); const distributedLock = require("./shared/distributedLock"); const EventQueueError = require("./EventQueueError"); +const { initMetrics } = require("./shared/openTelemetry"); const readFileAsync = promisify(fs.readFile); @@ -49,6 +50,7 @@ const CONFIG_VARS = [ ["disableProcessingOfSuspendedTenants", true], ["namespace", "default"], ["processingNamespaces", ["default"]], + ["collectEventQueueMetrics", false], ]; /** @@ -78,6 +80,7 @@ const CONFIG_VARS = [ * @param {string} [options.crashOnRedisUnavailable=true] - If enabled an error is thrown if the redis connection check is not successful * @param {string} [options.namespace=default] - Default namespace in which events are published * @param {string} [options.processingNamespaces=[default]] - Namespaces which the application processes + * @param {boolean} [options.collectEventQueueMetrics=false] - Enable collection of event queue metrics (pending/inProgress counters) stored in Redis and exposed via OpenTelemetry gauges. */ const initialize = async (options = {}) => { if (config.initialized) { @@ -125,6 +128,7 @@ const initialize = async (options = {}) => { runInterval: config.runInterval, useAsCAPQueue: config.useAsCAPQueue, }); + initMetrics(); resolveFn(); }; diff --git a/src/runner/openEvents.js b/src/runner/openEvents.js index fc95f75a..71058dfa 100644 --- a/src/runner/openEvents.js +++ b/src/runner/openEvents.js @@ -29,12 +29,12 @@ const getOpenQueueEntries = async (tx, filterAppSpecificEvents = true) => { new Date(startTime.getTime() - 30 * MS_IN_DAYS).toISOString(), ")" ) - .columns("type", "subType", "namespace") + .columns("count(ID) as count", "type", "subType", "namespace") .groupBy("type", "subType", "namespace") ); const result = []; - for (const { type, subType, namespace } of entries) { + for (const { type, subType, namespace, count } of entries) { if (config.isCapOutboxEvent(type)) { const { srvName, actionName } = config.normalizeSubType(type, subType); try { @@ -48,14 +48,14 @@ const getOpenQueueEntries = async (tx, filterAppSpecificEvents = true) => { config.addCAPServiceWithoutEnvConfig(subType, service); } if (config.shouldBeProcessedInThisApplication(type, subType, namespace)) { - result.push({ namespace, type, subType }); + result.push({ namespace, type, subType, count }); } } } catch { /* ignore catch */ } finally { if (!filterAppSpecificEvents) { - result.push({ namespace, type, subType }); + result.push({ namespace, type, subType, count }); } } } else { @@ -64,10 +64,10 @@ const getOpenQueueEntries = async (tx, filterAppSpecificEvents = true) => { config.getEventConfig(type, subType, namespace) && config.shouldBeProcessedInThisApplication(type, subType, namespace) ) { - result.push({ namespace, type, subType }); + result.push({ namespace, type, subType, count }); } } else { - result.push({ namespace, type, subType }); + result.push({ namespace, type, subType, count }); } } } diff --git a/src/runner/runner.js b/src/runner/runner.js index 9295041c..9a2fcfcf 100644 --- a/src/runner/runner.js +++ b/src/runner/runner.js @@ -14,6 +14,7 @@ const common = require("../shared/common"); const config = require("../config"); const redisPub = require("../redis/redisPub"); const openEvents = require("./openEvents"); +const eventQueueStats = require("../shared/eventQueueStats"); const { runEventCombinationForTenant } = require("./runnerHelper"); const { trace } = require("../shared/openTelemetry"); @@ -141,7 +142,7 @@ const _executeEventsAllTenantsRedis = async (tenantIds) => { } catch (err) { logger.error("executing event queue run for multi instance and tenant failed", err); } - + const tenantCounts = {}; for (const tenantId of tenantIds) { try { await cds.tx({ tenant: tenantId }, async (tx) => { @@ -160,6 +161,18 @@ const _executeEventsAllTenantsRedis = async (tenantIds) => { tenantId, entries: entries.length, }); + tenantCounts[tenantId] = entries; + const pendingByNamespace = Object.fromEntries(config.processingNamespaces.map((name) => [name, 0])); + for (const entry of entries) { + pendingByNamespace[entry.namespace] = (pendingByNamespace[entry.namespace] ?? 0) + entry.count; + } + if (config.collectEventQueueMetrics) { + for (const [namespace, count] of Object.entries(pendingByNamespace)) { + eventQueueStats + .setTenantCounter(tenantId, namespace, eventQueueStats.StatusField.Pending, count) + .catch((err) => logger.error("updating tenant stats failed", err, { tenantId, namespace })); + } + } if (!entries.length) { return; } @@ -178,6 +191,19 @@ const _executeEventsAllTenantsRedis = async (tenantIds) => { logger.error("broadcasting events for tenant failed", { tenantId }, err); } } + const globalPendingByNamespace = Object.fromEntries(config.processingNamespaces.map((namespace) => [namespace, 0])); + for (const tenantEntries of Object.values(tenantCounts)) { + for (const entry of tenantEntries) { + globalPendingByNamespace[entry.namespace] = (globalPendingByNamespace[entry.namespace] ?? 0) + entry.count; + } + } + if (config.collectEventQueueMetrics) { + for (const [namespace, count] of Object.entries(globalPendingByNamespace)) { + eventQueueStats + .setGlobalCounter(namespace, eventQueueStats.StatusField.Pending, count) + .catch((err) => logger.error("updating global stats failed", err, { namespace })); + } + } }; const _executeEventsAllTenants = async (tenantIds) => { @@ -367,6 +393,17 @@ const _singleTenantRedis = async () => { logger.info("broadcasting events for run", { entries: entries.length, }); + const pendingByNamespace = Object.fromEntries(config.processingNamespaces.map((name) => [name, 0])); + for (const entry of entries) { + pendingByNamespace[entry.namespace] = (pendingByNamespace[entry.namespace] ?? 0) + entry.count; + } + if (config.collectEventQueueMetrics) { + for (const [namespace, count] of Object.entries(pendingByNamespace)) { + eventQueueStats + .setGlobalCounter(namespace, eventQueueStats.StatusField.Pending, count) + .catch((err) => logger.error("updating global stats failed", err, { namespace })); + } + } if (!entries.length) { return; } diff --git a/src/shared/eventQueueStats.js b/src/shared/eventQueueStats.js new file mode 100644 index 00000000..4bba3ecf --- /dev/null +++ b/src/shared/eventQueueStats.js @@ -0,0 +1,209 @@ +"use strict"; + +const cds = require("@sap/cds"); + +const redis = require("./redis"); +const config = require("../config"); + +const COMPONENT_NAME = "/eventQueue/eventQueueStats"; + +const StatusField = { + Pending: "pending", + InProgress: "inProgress", +}; + +const _tenantKey = (tenantId) => `${config.redisNamespace(true)}##stats##tenant##${tenantId}`; +const _globalKey = () => `${config.redisNamespace(true)}##stats##global`; +const _keyPrefix = (namespace) => `${config.redisNamespace(false)}##${namespace}`; + +/** + * Atomically adjusts a tenant's event counter for the given status field. + * + * @param {string} tenantId + * @param {string} field - one of StatusField.* + * @param {number} increment - positive to increment, negative to decrement + */ +const adjustTenantCounter = async (tenantId, field, increment) => { + try { + const client = await redis.createMainClientAndConnect(); + await client.hIncrBy(_tenantKey(tenantId), field, increment); + } catch (err) { + cds.log(COMPONENT_NAME).error("failed to adjust tenant stats counter", err, { tenantId, field, increment }); + } +}; + +/** + * Atomically adjusts the global event counter for the given status field. + * Also updates the `updatedAt` timestamp on the global hash. + * + * @param {string} field - one of StatusField.* + * @param {number} increment - positive to increment, negative to decrement + */ +const adjustGlobalCounter = async (field, increment) => { + try { + const client = await redis.createMainClientAndConnect(); + await client.hIncrBy(_globalKey(), field, increment); + } catch (err) { + cds.log(COMPONENT_NAME).error("failed to adjust global stats counter", err, { field, increment }); + } +}; + +/** + * Increments a tenant counter and the matching global counter in a single call. + * + * @param {string} tenantId + * @param {string} field - one of StatusField.* + * @param {number} [increment=1] + */ +const incrementCounters = async (tenantId, field, increment = 1) => { + await Promise.allSettled([adjustTenantCounter(tenantId, field, increment), adjustGlobalCounter(field, increment)]); +}; + +/** + * Decrements a tenant counter and the matching global counter in a single call. + * + * @param {string} tenantId + * @param {string} field - one of StatusField.* + * @param {number} [decrement=1] + */ +const decrementCounters = async (tenantId, field, decrement = 1) => { + await Promise.allSettled([adjustTenantCounter(tenantId, field, -decrement), adjustGlobalCounter(field, -decrement)]); +}; + +/** + * Returns the current stats hash for a single tenant. + * All counter values are returned as integers; missing fields default to 0. + * + * @param {string} tenantId + * @returns {Promise<{pending: number, inProgress: number}>} + */ +const getTenantStats = async (tenantId) => { + try { + const client = await redis.createMainClientAndConnect(); + const raw = await client.hGetAll(_tenantKey(tenantId)); + return _parseCounterHash(raw); + } catch (err) { + cds.log(COMPONENT_NAME).error("failed to read tenant stats", err, { tenantId }); + return _emptyCounters(); + } +}; + +/** + * Returns the current global stats hash. + * All counter values are returned as integers; missing fields default to 0. + * + * @returns {Promise<{pending: number, inProgress: number}>} + */ +const getGlobalStats = async () => { + try { + const client = await redis.createMainClientAndConnect(); + const raw = await client.hGetAll(_globalKey()); + return _parseCounterHash(raw); + } catch (err) { + cds.log(COMPONENT_NAME).error("failed to read global stats", err); + return _emptyCounters(); + } +}; + +/** + * Deletes the stats hash for a specific tenant. + * Intended for use during tenant offboarding. It does not adjust the global stats still will be fixed with the next global run + * + * @param {string} tenantId + */ +const setTenantCounter = async (tenantId, namespace, field, value) => { + try { + const client = await redis.createMainClientAndConnect(); + await client.hSet(`${_keyPrefix(namespace)}##stats##tenant##${tenantId}`, field, value); + } catch (err) { + cds.log(COMPONENT_NAME).error("failed to set tenant stats counter", err, { tenantId, namespace, field, value }); + } +}; + +const setGlobalCounter = async (namespace, field, value) => { + try { + const client = await redis.createMainClientAndConnect(); + await client.hSet(`${_keyPrefix(namespace)}##stats##global`, field, value); + } catch (err) { + cds.log(COMPONENT_NAME).error("failed to set global stats counter", err, { namespace, field, value }); + } +}; + +const getAllNamespaceStats = async () => { + const namespaces = config.processingNamespaces; + const client = await redis.createMainClientAndConnect(); + const results = await Promise.allSettled( + namespaces.map(async (namespace) => { + const raw = await client.hGetAll(`${_keyPrefix(namespace)}##stats##global`); + return { namespace, stats: _parseCounterHash(raw) }; + }) + ); + const out = {}; + for (const result of results) { + if (result.status === "fulfilled") { + out[result.value.namespace] = result.value.stats; + } else { + cds.log(COMPONENT_NAME).error("failed to read namespace stats", result.reason); + } + } + return out; +}; + +const deleteTenantStats = async (tenantId) => { + try { + const client = await redis.createMainClientAndConnect(); + await client.del(_tenantKey(tenantId)); + } catch (err) { + cds.log(COMPONENT_NAME).error("failed to delete tenant stats", err, { tenantId }); + } +}; + +/** + * Resets the inProgress counter to 0 for all processing namespaces (global + all tenants). + * Called on instance startup to clean up stale counts left by a previous crash. + */ +const resetInProgressCounters = async () => { + try { + const clientOrCluster = await redis.createMainClientAndConnect(); + const clients = redis.isClusterMode() ? clientOrCluster.masters.map((master) => master.client) : [clientOrCluster]; + + const globalOps = config.processingNamespaces.map((namespace) => + clientOrCluster.hSet(`${_keyPrefix(namespace)}##stats##global`, StatusField.InProgress, 0) + ); + await Promise.allSettled(globalOps); + + // NOTE: use SCAN because KEYS is not supported for cluster clients + for (const client of clients) { + for await (const key of client.scanIterator({ MATCH: "*##stats##tenant##*", COUNT: 1000 })) { + await client.hSet(key, StatusField.InProgress, 0); + } + } + } catch (err) { + cds.log(COMPONENT_NAME).error("failed to reset inProgress counters on startup", err); + } +}; + +const _parseCounterHash = (raw) => ({ + [StatusField.Pending]: raw[StatusField.Pending] != null ? parseInt(raw[StatusField.Pending]) : 0, + [StatusField.InProgress]: raw[StatusField.InProgress] != null ? parseInt(raw[StatusField.InProgress]) : 0, +}); + +const _emptyCounters = () => ({ + [StatusField.Pending]: 0, + [StatusField.InProgress]: 0, +}); + +module.exports = { + StatusField, + incrementCounters, + decrementCounters, + adjustTenantCounter, + adjustGlobalCounter, + setTenantCounter, + setGlobalCounter, + getAllNamespaceStats, + getTenantStats, + getGlobalStats, + deleteTenantStats, + resetInProgressCounters, +}; diff --git a/src/shared/openTelemetry.js b/src/shared/openTelemetry.js index f9d5896a..4192d6f2 100644 --- a/src/shared/openTelemetry.js +++ b/src/shared/openTelemetry.js @@ -12,9 +12,13 @@ const cds = require("@sap/cds"); const otel = _resilientRequire("@opentelemetry/api"); const config = require("../config"); +const eventQueueStats = require("./eventQueueStats"); const COMPONENT_NAME = "/shared/openTelemetry"; +let _statsSnapshot = null; +let _metricsInitialized = false; + const trace = async (context, label, fn, { attributes = {}, newRootSpan = false, traceContext } = {}) => { if (!config.enableTelemetry || !otel) { return fn(); @@ -110,4 +114,75 @@ const getCurrentTraceContext = () => { return carrier; }; -module.exports = { trace, getCurrentTraceContext }; +const _refreshStats = async () => { + try { + const namespaces = await eventQueueStats.getAllNamespaceStats(); + _statsSnapshot = { namespaces, lastRefreshedAt: Date.now() }; + } catch (err) { + cds.log(COMPONENT_NAME).error("failed to refresh queue stats for metrics", err); + } +}; + +const initMetrics = () => { + if ( + _metricsInitialized || + !config.collectEventQueueMetrics || + !config.enableTelemetry || + !config.redisEnabled || + !otel?.metrics + ) { + return; + } + const meterProvider = otel.metrics.getMeterProvider?.(); + if (!meterProvider) { + return; + } + + _metricsInitialized = true; + + eventQueueStats + .resetInProgressCounters() + .catch((err) => cds.log(COMPONENT_NAME).error("failed to reset inProgress counters", err)); + + const meter = otel.metrics.getMeter("@cap-js-community/event-queue"); + + const pendingGauge = meter.createObservableGauge("cap.event_queue.jobs.pending", { + description: "Current number of jobs waiting to be processed.", + unit: "1", + }); + const inProgressGauge = meter.createObservableGauge("cap.event_queue.jobs.in_progress", { + description: "Current number of jobs actively being processed by workers.", + unit: "1", + }); + const refreshAgeGauge = meter.createObservableGauge("cap.event_queue.stats.refresh_age", { + description: "Age of the most recent queue statistics snapshot.", + unit: "s", + }); + + _statsSnapshot = { + lastRefreshedAt: Date.now(), + namespaces: Object.fromEntries( + config.processingNamespaces.map((namespace) => [namespace, { pending: 0, inProgress: 0 }]) + ), + }; + _refreshStats(); + + meter.addBatchObservableCallback( + (observableResult) => { + if (!_statsSnapshot) { + return; + } + observableResult.observe(refreshAgeGauge, (Date.now() - _statsSnapshot.lastRefreshedAt) / 1000); + for (const [namespace, stats] of Object.entries(_statsSnapshot.namespaces)) { + const attrs = { "queue.namespace": namespace }; + observableResult.observe(pendingGauge, stats.pending, attrs); + observableResult.observe(inProgressGauge, stats.inProgress, attrs); + } + }, + [pendingGauge, inProgressGauge, refreshAgeGauge] + ); + + setInterval(_refreshStats, 30_000).unref(); +}; + +module.exports = { trace, getCurrentTraceContext, initMetrics }; diff --git a/test-integration/__snapshots__/e2e-redis.test.js.snap b/test-integration/__snapshots__/e2e-redis.test.js.snap index 2f662a8e..161f47ef 100644 --- a/test-integration/__snapshots__/e2e-redis.test.js.snap +++ b/test-integration/__snapshots__/e2e-redis.test.js.snap @@ -11,7 +11,9 @@ exports[`end-to-end redis broadcast checkAndInsertPeriodicEvents should insert n exports[`end-to-end redis broadcast checkAndInsertPeriodicEvents should insert new events and runner should broadcast + process events 2`] = ` [ "EVENT_QUEUE##default##RUN_REDIS_CHECK", + "EVENT_QUEUE##default##stats##global", "EVENT_QUEUE##default####TEST_STATIC", + "EVENT_QUEUE##default##stats##tenant##undefined", ] `; @@ -24,6 +26,8 @@ exports[`end-to-end redis broadcast insert entry: redis broadcast + process 1`] exports[`end-to-end redis broadcast insert entry: redis broadcast + process 2`] = ` [ + "EVENT_QUEUE##default##stats##tenant##undefined", + "EVENT_QUEUE##default##stats##global", "EVENT_QUEUE##default####TEST_STATIC", ] `; @@ -34,4 +38,9 @@ exports[`end-to-end runner should select open events and process + validate skip ] `; -exports[`end-to-end runner should select open events and process + validate skip broadcast 2`] = `[]`; +exports[`end-to-end runner should select open events and process + validate skip broadcast 2`] = ` +[ + "EVENT_QUEUE##default##stats##tenant##undefined", + "EVENT_QUEUE##default##stats##global", +] +`; diff --git a/test-integration/__snapshots__/runner.test.js.snap b/test-integration/__snapshots__/runner.test.js.snap index 54c00387..9009169c 100644 --- a/test-integration/__snapshots__/runner.test.js.snap +++ b/test-integration/__snapshots__/runner.test.js.snap @@ -86,6 +86,16 @@ exports[`runner redis multi tenant no open events 2`] = ` "PX": 60000, }, }, + "EVENT_QUEUE##default##stats##global": { + "hash": { + "pending": "0", + }, + }, + "EVENT_QUEUE##default##stats##tenant##TEST_STATIC": { + "hash": { + "pending": "0", + }, + }, } `; @@ -95,6 +105,7 @@ exports[`runner redis multi tenant tenant id filter should not acquire lock - on "e9bb8ec0-c85e-4035-b7cf-1b11ba8e5792", [ { + "count": 1, "namespace": "default", "subType": "DELETE_EVENTS", "type": "EVENT_QUEUE_BASE_PERIODIC", @@ -105,6 +116,7 @@ exports[`runner redis multi tenant tenant id filter should not acquire lock - on "cd805323-879c-4bf7-b19c-8ffbbee22e1f", [ { + "count": 1, "namespace": "default", "subType": "DELETE_EVENTS", "type": "EVENT_QUEUE_BASE_PERIODIC", @@ -115,6 +127,7 @@ exports[`runner redis multi tenant tenant id filter should not acquire lock - on "9f3ed8f0-8aaf-439e-a96a-04cd5b680c59", [ { + "count": 1, "namespace": "default", "subType": "DELETE_EVENTS", "type": "EVENT_QUEUE_BASE_PERIODIC", @@ -126,7 +139,20 @@ exports[`runner redis multi tenant tenant id filter should not acquire lock - on exports[`runner redis multi tenant tenant id filter should not acquire lock - only process tenants based on tenant filter with open events - split into two instances 2`] = `[]`; -exports[`runner redis multi tenant tenant id filter should not acquire lock - only process tenants based on tenant filter with open events - split into two instances 3`] = `{}`; +exports[`runner redis multi tenant tenant id filter should not acquire lock - only process tenants based on tenant filter with open events - split into two instances 3`] = ` +{ + "EVENT_QUEUE##default##stats##global": { + "hash": { + "pending": "2", + }, + }, + "EVENT_QUEUE##default##stats##tenant##TEST_STATIC": { + "hash": { + "pending": "1", + }, + }, +} +`; exports[`runner redis multi tenant with open events - broadcast should be called 1`] = ` [ @@ -134,6 +160,7 @@ exports[`runner redis multi tenant with open events - broadcast should be called "cd805323-879c-4bf7-b19c-8ffbbee22e1f", [ { + "count": 1, "namespace": "default", "subType": "DELETE_EVENTS", "type": "EVENT_QUEUE_BASE_PERIODIC", @@ -144,6 +171,7 @@ exports[`runner redis multi tenant with open events - broadcast should be called "9f3ed8f0-8aaf-439e-a96a-04cd5b680c59", [ { + "count": 1, "namespace": "default", "subType": "DELETE_EVENTS", "type": "EVENT_QUEUE_BASE_PERIODIC", @@ -154,6 +182,7 @@ exports[`runner redis multi tenant with open events - broadcast should be called "e9bb8ec0-c85e-4035-b7cf-1b11ba8e5792", [ { + "count": 1, "namespace": "default", "subType": "DELETE_EVENTS", "type": "EVENT_QUEUE_BASE_PERIODIC", @@ -203,6 +232,16 @@ exports[`runner redis multi tenant with open events - broadcast should be called "PX": 60000, }, }, + "EVENT_QUEUE##default##stats##global": { + "hash": { + "pending": "3", + }, + }, + "EVENT_QUEUE##default##stats##tenant##TEST_STATIC": { + "hash": { + "pending": "1", + }, + }, } `; @@ -233,6 +272,11 @@ exports[`runner redis single tenant no open events 2`] = ` "PX": 1425000, }, }, + "EVENT_QUEUE##default##stats##global": { + "hash": { + "pending": "0", + }, + }, } `; @@ -263,5 +307,10 @@ exports[`runner redis single tenant with open events - broadcast should be calle "PX": 1425000, }, }, + "EVENT_QUEUE##default##stats##global": { + "hash": { + "pending": "1", + }, + }, } `; diff --git a/test-integration/dbHandlerStats.test.js b/test-integration/dbHandlerStats.test.js new file mode 100644 index 00000000..6a7cdbf4 --- /dev/null +++ b/test-integration/dbHandlerStats.test.js @@ -0,0 +1,386 @@ +"use strict"; + +const path = require("path"); + +const mockRedis = require("../test/mocks/redisMock"); +jest.mock("../src/shared/redis", () => mockRedis); + +const cds = require("@sap/cds"); +cds.test(__dirname + "/_env"); + +const basePath = path.join(__dirname, "..", "test", "asset", "outboxProject"); +cds.env.requires.StandardService = { + impl: path.join(basePath, "srv/service/standard-service.js"), + outbox: { kind: "persistent-outbox" }, +}; + +cds.env.requires.NotificationService = { + impl: path.join(basePath, "srv/service/service.js"), + outbox: { kind: "persistent-outbox" }, +}; + +const eventQueue = require("../src"); +const config = require("../src/config"); +const { getTenantStats, getGlobalStats, StatusField } = require("../src/shared/eventQueueStats"); +const { EventProcessingStatus } = require("../src/constants"); +const { processEventQueue } = require("../src/processEventQueue"); +const testHelper = require("../test/helper"); +const { Logger: mockLogger } = require("../test/mocks/logger"); + +describe("dbHandler - stats tracking on HANA", () => { + let context, tx, loggerMock; + + beforeAll(async () => { + eventQueue.config.initialized = false; + await eventQueue.initialize({ + processEventsAfterPublish: false, + registerAsEventProcessor: false, + insertEventsBeforeCommit: true, + useAsCAPOutbox: true, + userId: "dummyTestUser", + }); + const db = await cds.connect.to("db"); + cds.emit("connect", db); + config.redisEnabled = true; + config.collectEventQueueMetrics = true; + eventQueue.registerEventQueueDbHandler(db); + loggerMock = mockLogger(); + }); + + beforeEach(async () => { + context = new cds.EventContext({ user: "testUser", tenant: 123 }); + tx = cds.tx(context); + await cds.tx({}, async (tx2) => { + await tx2.run(DELETE.from("sap.eventqueue.Lock")); + await tx2.run(DELETE.from("sap.eventqueue.Event")); + }); + await commitAndOpenNew(); + mockRedis.clearState(); + jest.clearAllMocks(); + }); + + afterEach(async () => { + await tx.rollback(); + }); + + afterAll(async () => { + config.redisEnabled = false; + config.collectEventQueueMetrics = false; + await cds.disconnect(); + await cds.shutdown(); + }); + + it("increments pending counter by 1 after single send and commit", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(1); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(1); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("accumulates pending counter for multiple sends in same transaction", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(3); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(3); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("does not increment counter when transaction is rolled back", async () => { + const innerTx = cds.tx(context); + const service = (await cds.connect.to("StandardService")).tx(innerTx.context); + await service.send("main", {}); + await innerTx.rollback(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(0); + }); + + it("does not increment counter when redisEnabled is false", async () => { + config.redisEnabled = false; + try { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(0); + } finally { + config.redisEnabled = true; + } + }); + + it("tracks stats per tenant while global counter aggregates", async () => { + const service123 = (await cds.connect.to("StandardService")).tx(context); + await service123.send("main", {}); + await service123.send("main", {}); + await commitAndOpenNew(); + + const ctx456 = new cds.EventContext({ user: "testUser", tenant: 456 }); + const tx456 = cds.tx(ctx456); + const service456 = (await cds.connect.to("StandardService")).tx(ctx456); + await service456.send("main", {}); + await tx456.commit(); + + const stats123 = await getTenantStats(123); + expect(stats123[StatusField.Pending]).toBe(2); + + const stats456 = await getTenantStats(456); + expect(stats456[StatusField.Pending]).toBe(1); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(3); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + describe("UPDATE handler — HANA affectedRows behavior", () => { + it("Open → InProgress: HANA affectedRows returns correct count for bulk update of 3 rows", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=3 + + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await commitAndOpenNew(); + + // On HANA, affectedRows must be 3 — not the fallback of 1 + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + expect(tenantStats[StatusField.InProgress]).toBe(3); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(0); + expect(globalStats[StatusField.InProgress]).toBe(3); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("InProgress → Done: HANA affectedRows correctly decrements all inProgress", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=3 + + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await commitAndOpenNew(); // pending=0, inProgress=3 + + await tx.run( + UPDATE.entity("sap.eventqueue.Event") + .set({ status: EventProcessingStatus.Done }) + .where({ status: EventProcessingStatus.InProgress }) + ); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("InProgress → Error: HANA affectedRows correctly restores all rows as pending", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=3 + + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await commitAndOpenNew(); // pending=0, inProgress=3 + + await tx.run( + UPDATE.entity("sap.eventqueue.Event") + .set({ status: EventProcessingStatus.Error }) + .where({ status: EventProcessingStatus.InProgress }) + ); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(3); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(3); + expect(globalStats[StatusField.InProgress]).toBe(0); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("InProgress → Exceeded: HANA affectedRows correctly decrements all inProgress", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=3 + + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await commitAndOpenNew(); // pending=0, inProgress=3 + + await tx.run( + UPDATE.entity("sap.eventqueue.Event") + .set({ status: EventProcessingStatus.Exceeded }) + .where({ status: EventProcessingStatus.InProgress }) + ); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("UPDATE matching 0 rows does not affect counters", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await commitAndOpenNew(); // pending=1 + + // WHERE clause matches nothing — affectedRows=0, must not change counters + await tx.run( + UPDATE.entity("sap.eventqueue.Event") + .set({ status: EventProcessingStatus.Done }) + .where({ status: EventProcessingStatus.InProgress }) // nothing is InProgress yet + ); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(1); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("two UPDATEs in one transaction accumulate into a single succeeded handler call", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=2 + + // Open→InProgress then InProgress→Done without an intermediate commit + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await tx.run( + UPDATE.entity("sap.eventqueue.Event") + .set({ status: EventProcessingStatus.Done }) + .where({ status: EventProcessingStatus.InProgress }) + ); + await commitAndOpenNew(); + + // Net delta: pending -2, inProgress +2 then -2 → both counters at 0 + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("does not adjust counters when redisEnabled is false", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=2 + + config.redisEnabled = false; + try { + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(2); + expect(tenantStats[StatusField.InProgress]).toBe(0); + } finally { + config.redisEnabled = true; + } + }); + + it("does not adjust counters when transaction is rolled back", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=2 + + const innerTx = cds.tx(context); + await innerTx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await innerTx.rollback(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(2); + expect(tenantStats[StatusField.InProgress]).toBe(0); + }); + }); + + describe("processEventQueue integration — stats via real processing", () => { + it("successful processing transitions pending → inProgress → Done (counters reach zero)", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await commitAndOpenNew(); + + expect((await getTenantStats(123))[StatusField.Pending]).toBe(1); + expect((await getTenantStats(123))[StatusField.InProgress]).toBe(0); + + await processEventQueue(tx.context, "CAP_OUTBOX", "StandardService"); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(0); + expect(globalStats[StatusField.InProgress]).toBe(0); + + await testHelper.selectEventQueueAndExpectDone(tx, { expectedLength: 1 }); + }); + + it("failed processing transitions pending → inProgress → Error → back to pending", async () => { + const service = cds.outboxed(await cds.connect.to("NotificationService")).tx(context); + await service.send("errorEvent", { to: "to", subject: "subject", body: "body" }); + await commitAndOpenNew(); + + expect((await getTenantStats(123))[StatusField.Pending]).toBe(1); + expect((await getTenantStats(123))[StatusField.InProgress]).toBe(0); + + await processEventQueue(tx.context, "CAP_OUTBOX", "NotificationService"); + await commitAndOpenNew(); + + // Error state means the event will be retried → counts as pending + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(1); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(1); + expect(globalStats[StatusField.InProgress]).toBe(0); + + await testHelper.selectEventQueueAndExpectError(tx, { expectedLength: 1 }); + }); + }); + + const commitAndOpenNew = async () => { + await tx.commit(); + context = new cds.EventContext({ user: "testUser", tenant: 123 }); + tx = cds.tx(context); + }; +}); diff --git a/test-integration/e2e-redis.test.js b/test-integration/e2e-redis.test.js index e310968c..49b9c093 100644 --- a/test-integration/e2e-redis.test.js +++ b/test-integration/e2e-redis.test.js @@ -75,6 +75,7 @@ describe("end-to-end", () => { isEventQueueActive: true, }); eventQueue.config.redisEnabled = true; + eventQueue.config.collectEventQueueMetrics = true; cds.emit("connect", await cds.connect.to("db")); }); @@ -82,7 +83,6 @@ describe("end-to-end", () => { beforeEach(async () => { await DELETE.from("sap.eventqueue.Lock"); await DELETE.from("sap.eventqueue.Event"); - await DELETE.from("cds.outbox.Messages"); jest.clearAllMocks(); redisMock.clearState(); redisMock.clearTestState(); diff --git a/test-integration/keep-alive-tx-handling-e2e.test.js b/test-integration/keep-alive-tx-handling-e2e.test.js index 748e34fa..f7783815 100644 --- a/test-integration/keep-alive-tx-handling-e2e.test.js +++ b/test-integration/keep-alive-tx-handling-e2e.test.js @@ -20,6 +20,7 @@ const distributedLock = require("../src/shared/distributedLock"); const periodicEvents = require("../src/periodicEvents"); const { publishEvent } = require("../src/publishEvent"); const redisPub = require("../src/redis/redisPub"); +const eventQueueStats = require("../src/shared/eventQueueStats"); const configFilePath = path.join(__dirname, "..", "./test", "asset", "config.yml"); @@ -48,6 +49,7 @@ describe("keep-alive-tx-handling-e2e", () => { registerAsEventProcessor: false, isEventQueueActive: false, }); + jest.spyOn(eventQueueStats, "incrementCounters").mockResolvedValue(); loggerMock = mockLogger(); const db = await cds.connect.to("db"); @@ -110,6 +112,7 @@ describe("keep-alive-tx-handling-e2e", () => { subType: isolatedNoParallel.subType, }) ); + const { db } = cds.services; const { Event } = cds.entities("sap.eventqueue"); let forUpdateCounter = 0; diff --git a/test-integration/runner.test.js b/test-integration/runner.test.js index 79e8a153..0c63671d 100644 --- a/test-integration/runner.test.js +++ b/test-integration/runner.test.js @@ -7,6 +7,7 @@ cds.test(__dirname + "/_env"); const mockRedis = require("../test/mocks/redisMock"); jest.mock("../src/shared/redis", () => mockRedis); +const { getTenantStats, getGlobalStats, StatusField } = require("../src/shared/eventQueueStats"); const WorkerQueue = require("../src/shared/WorkerQueue"); const processEventQueue = require("../src/processEventQueue"); const openEvents = require("../src/runner/openEvents"); @@ -58,6 +59,7 @@ describe("runner", () => { }); configInstance = eventQueue.config; configInstance.redisEnabled = true; + configInstance.collectEventQueueMetrics = true; loggerMock = mockLogger(); jest.clearAllMocks(); }); @@ -176,6 +178,49 @@ describe("runner", () => { expect(loggerMock.callsLengths().error).toEqual(0); }); + describe("stats tracking", () => { + it("sets pending counter per tenant and globally after runner with open events", async () => { + await cds.tx({}, async (tx2) => { + await periodicEvents.checkAndInsertPeriodicEvents(tx2.context); + }); + mockTenantIds(tenantIds); + jest.spyOn(redisPub, "broadcastEvent").mockResolvedValue(); + jest.spyOn(periodicEvents, "checkAndInsertPeriodicEvents").mockResolvedValue(); + + await Promise.allSettled([runner.__._multiTenancyRedis(), runner.__._multiTenancyRedis()]); + await Promise.allSettled(WorkerQueue.instance.runningPromises); + await new Promise(setImmediate); + + for (const tenantId of tenantIds) { + const stats = await getTenantStats(tenantId); + expect(stats[StatusField.Pending]).toBe(1); + } + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(tenantIds.length); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("does not update stats when there are no open events", async () => { + mockTenantIds(tenantIds); + jest.spyOn(redisPub, "broadcastEvent").mockResolvedValue(); + jest.spyOn(periodicEvents, "checkAndInsertPeriodicEvents").mockResolvedValue(); + + await Promise.allSettled([runner.__._multiTenancyRedis()]); + await Promise.allSettled(WorkerQueue.instance.runningPromises); + await new Promise(setImmediate); + + for (const tenantId of tenantIds) { + const stats = await getTenantStats(tenantId); + expect(stats[StatusField.Pending]).toBe(0); + } + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(0); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + }); + describe("tenant id filter should not acquire lock - only process tenants based on tenant filter", () => { it("always false", async () => { await cds.tx({}, async (tx2) => { diff --git a/test/__snapshots__/baseFunctionality.test.js.snap b/test/__snapshots__/baseFunctionality.test.js.snap index 3c7688e1..d1c4249a 100644 --- a/test/__snapshots__/baseFunctionality.test.js.snap +++ b/test/__snapshots__/baseFunctionality.test.js.snap @@ -111,31 +111,37 @@ exports[`baseFunctionality ad-hoc events error handling missing event implementa exports[`baseFunctionality getOpenQueueEntries event types in error should be considered 2`] = ` [ { + "count": 1, "namespace": "default", "subType": "DELETE_EVENTS", "type": "EVENT_QUEUE_BASE_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "DBKeepAlive", "type": "HealthCheckKeepAlive_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "DB", "type": "HealthCheck_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "Task", "type": "Notifications", }, { + "count": 1, "namespace": "default", "subType": "everyFiveMin", "type": "TimeSpecificEveryFiveMin_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "cron", "type": "TimeSpecificEveryMin_PERIODIC", @@ -146,26 +152,31 @@ exports[`baseFunctionality getOpenQueueEntries event types in error should be co exports[`baseFunctionality getOpenQueueEntries event types in progress should be ignored 2`] = ` [ { + "count": 1, "namespace": "default", "subType": "DELETE_EVENTS", "type": "EVENT_QUEUE_BASE_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "DBKeepAlive", "type": "HealthCheckKeepAlive_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "DB", "type": "HealthCheck_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "everyFiveMin", "type": "TimeSpecificEveryFiveMin_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "cron", "type": "TimeSpecificEveryMin_PERIODIC", @@ -176,46 +187,55 @@ exports[`baseFunctionality getOpenQueueEntries event types in progress should be exports[`baseFunctionality getOpenQueueEntries filterAppSpecificEvents return open event types 2`] = ` [ { + "count": 1, "namespace": "default", "subType": "AppInstance", "type": "AppSpecific_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "AppName", "type": "AppSpecific_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "both", "type": "AppSpecific_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "DELETE_EVENTS", "type": "EVENT_QUEUE_BASE_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "DBKeepAlive", "type": "HealthCheckKeepAlive_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "DB", "type": "HealthCheck_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "Task", "type": "Notifications", }, { + "count": 1, "namespace": "default", "subType": "everyFiveMin", "type": "TimeSpecificEveryFiveMin_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "cron", "type": "TimeSpecificEveryMin_PERIODIC", @@ -226,31 +246,37 @@ exports[`baseFunctionality getOpenQueueEntries filterAppSpecificEvents return op exports[`baseFunctionality getOpenQueueEntries return open event types 2`] = ` [ { + "count": 1, "namespace": "default", "subType": "DELETE_EVENTS", "type": "EVENT_QUEUE_BASE_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "DBKeepAlive", "type": "HealthCheckKeepAlive_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "DB", "type": "HealthCheck_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "Task", "type": "Notifications", }, { + "count": 1, "namespace": "default", "subType": "everyFiveMin", "type": "TimeSpecificEveryFiveMin_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "cron", "type": "TimeSpecificEveryMin_PERIODIC", @@ -261,11 +287,13 @@ exports[`baseFunctionality getOpenQueueEntries return open event types 2`] = ` exports[`baseFunctionality getOpenQueueEntries should respect app instance configuration both open event relevant for app 2`] = ` [ { + "count": 1, "namespace": "default", "subType": "AppInstance", "type": "AppSpecific", }, { + "count": 1, "namespace": "default", "subType": "Task", "type": "Notifications", @@ -276,6 +304,7 @@ exports[`baseFunctionality getOpenQueueEntries should respect app instance confi exports[`baseFunctionality getOpenQueueEntries should respect app instance configuration one open event for app and one not for this app 2`] = ` [ { + "count": 1, "namespace": "default", "subType": "Task", "type": "Notifications", @@ -286,11 +315,13 @@ exports[`baseFunctionality getOpenQueueEntries should respect app instance confi exports[`baseFunctionality getOpenQueueEntries should respect app instance configuration one open event for app and one not for this app but redis should ignore filter 2`] = ` [ { + "count": 1, "namespace": "default", "subType": "AppInstance", "type": "AppSpecific", }, { + "count": 1, "namespace": "default", "subType": "Task", "type": "Notifications", @@ -301,11 +332,13 @@ exports[`baseFunctionality getOpenQueueEntries should respect app instance confi exports[`baseFunctionality getOpenQueueEntries should respect app name configuration both open event relevant for app 2`] = ` [ { + "count": 1, "namespace": "default", "subType": "AppName", "type": "AppSpecific", }, { + "count": 1, "namespace": "default", "subType": "Task", "type": "Notifications", @@ -316,6 +349,7 @@ exports[`baseFunctionality getOpenQueueEntries should respect app name configura exports[`baseFunctionality getOpenQueueEntries should respect app name configuration one open event for app and one not for this app 2`] = ` [ { + "count": 1, "namespace": "default", "subType": "Task", "type": "Notifications", @@ -326,11 +360,13 @@ exports[`baseFunctionality getOpenQueueEntries should respect app name configura exports[`baseFunctionality getOpenQueueEntries should respect app name configuration one open event for app and one not for this app but redis should ignore filter 2`] = ` [ { + "count": 1, "namespace": "default", "subType": "AppName", "type": "AppSpecific", }, { + "count": 1, "namespace": "default", "subType": "Task", "type": "Notifications", diff --git a/test/__snapshots__/eventQueueOutbox.test.js.snap b/test/__snapshots__/eventQueueOutbox.test.js.snap index 33f65d63..9ce8cc2c 100644 --- a/test/__snapshots__/eventQueueOutbox.test.js.snap +++ b/test/__snapshots__/eventQueueOutbox.test.js.snap @@ -419,26 +419,31 @@ exports[`event-queue outbox monkeyPatchCAPOutbox=true req.data should be stored exports[`event-queue outbox monkeyPatchCAPOutbox=true return open event types 1`] = ` [ { + "count": 1, "namespace": "default", "subType": "NotificationService", "type": "CAP_OUTBOX", }, { + "count": 1, "namespace": "default", "subType": "NotificationServicePeriodic.main", "type": "CAP_OUTBOX_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "NotificationServicePeriodic.randomOffset", "type": "CAP_OUTBOX_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "QueueService.main", "type": "CAP_OUTBOX_PERIODIC", }, { + "count": 1, "namespace": "default", "subType": "sapafcsdk.scheduling.ProviderService.timeBucketAction", "type": "CAP_OUTBOX_PERIODIC", diff --git a/test/dbHandlerStats.test.js b/test/dbHandlerStats.test.js new file mode 100644 index 00000000..412dd1c7 --- /dev/null +++ b/test/dbHandlerStats.test.js @@ -0,0 +1,342 @@ +"use strict"; + +const path = require("path"); + +const mockRedis = require("./mocks/redisMock"); +jest.mock("../src/shared/redis", () => mockRedis); + +const cds = require("@sap/cds/lib"); + +const eventQueue = require("../src"); +const config = require("../src/config"); +const { getTenantStats, getGlobalStats, StatusField } = require("../src/shared/eventQueueStats"); +const { EventProcessingStatus } = require("../src/constants"); +const { processEventQueue } = require("../src/processEventQueue"); +const testHelper = require("./helper"); +const { Logger: mockLogger } = require("./mocks/logger"); + +const outboxProject = path.join(__dirname, "asset", "outboxProject"); + +cds.env.requires.StandardService = { + impl: path.join(outboxProject, "srv/service/standard-service.js"), + outbox: { kind: "persistent-outbox" }, +}; + +cds.env.requires.NotificationService = { + impl: path.join(outboxProject, "srv/service/service.js"), + outbox: { kind: "persistent-outbox" }, +}; + +cds.test(outboxProject); + +describe("dbHandler - stats tracking via CAP outbox", () => { + let context, tx, loggerMock; + + beforeAll(async () => { + eventQueue.config.initialized = false; + await eventQueue.initialize({ + processEventsAfterPublish: false, + registerAsEventProcessor: false, + insertEventsBeforeCommit: true, + useAsCAPOutbox: true, + userId: "dummyTestUser", + }); + cds.emit("connect", await cds.connect.to("db")); + config.redisEnabled = true; + config.collectEventQueueMetrics = true; + eventQueue.registerEventQueueDbHandler(cds.db); + loggerMock = mockLogger(); + }); + + beforeEach(async () => { + context = new cds.EventContext({ user: "testUser", tenant: 123 }); + tx = cds.tx(context); + await tx.run(DELETE.from("sap.eventqueue.Lock")); + await tx.run(DELETE.from("sap.eventqueue.Event")); + await commitAndOpenNew(); + mockRedis.clearState(); + jest.clearAllMocks(); + }); + + afterEach(async () => { + await tx.rollback(); + }); + + afterAll(async () => { + config.redisEnabled = false; + config.collectEventQueueMetrics = false; + await cds.shutdown; + }); + + it("increments pending counter by 1 after single send and commit", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(1); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(1); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("accumulates pending counter for multiple sends in same transaction", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(3); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(3); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("does not increment counter when transaction is rolled back", async () => { + const innerTx = cds.tx(context); + const service = (await cds.connect.to("StandardService")).tx(innerTx.context); + await service.send("main", {}); + await innerTx.rollback(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(0); + }); + + it("does not increment counter when redisEnabled is false", async () => { + config.redisEnabled = false; + try { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(0); + } finally { + config.redisEnabled = true; + } + }); + + it("tracks stats per tenant while global counter aggregates", async () => { + // tenant 123: send 2 events + const service123 = (await cds.connect.to("StandardService")).tx(context); + await service123.send("main", {}); + await service123.send("main", {}); + await commitAndOpenNew(); + + // tenant 456: send 1 event in its own tx + const ctx456 = new cds.EventContext({ user: "testUser", tenant: 456 }); + const tx456 = cds.tx(ctx456); + const service456 = (await cds.connect.to("StandardService")).tx(ctx456); + await service456.send("main", {}); + await tx456.commit(); + + const stats123 = await getTenantStats(123); + expect(stats123[StatusField.Pending]).toBe(2); + + const stats456 = await getTenantStats(456); + expect(stats456[StatusField.Pending]).toBe(1); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(3); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + describe("UPDATE handler — direct status transitions", () => { + it("Open → InProgress: decrements pending, increments inProgress", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=2 + + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + expect(tenantStats[StatusField.InProgress]).toBe(2); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(0); + expect(globalStats[StatusField.InProgress]).toBe(2); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("InProgress → Done: decrements inProgress, no pending change", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=2 + + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await commitAndOpenNew(); // pending=0, inProgress=2 + + await tx.run( + UPDATE.entity("sap.eventqueue.Event") + .set({ status: EventProcessingStatus.Done }) + .where({ status: EventProcessingStatus.InProgress }) + ); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("InProgress → Error: decrements inProgress, increments pending (will be retried)", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=2 + + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await commitAndOpenNew(); // pending=0, inProgress=2 + + await tx.run( + UPDATE.entity("sap.eventqueue.Event") + .set({ status: EventProcessingStatus.Error }) + .where({ status: EventProcessingStatus.InProgress }) + ); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(2); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(2); + expect(globalStats[StatusField.InProgress]).toBe(0); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("InProgress → Exceeded: decrements inProgress, no pending change", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=2 + + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await commitAndOpenNew(); // pending=0, inProgress=2 + + await tx.run( + UPDATE.entity("sap.eventqueue.Event") + .set({ status: EventProcessingStatus.Exceeded }) + .where({ status: EventProcessingStatus.InProgress }) + ); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("two UPDATEs in one transaction accumulate into a single succeeded handler call", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=2 + + // Open→InProgress then InProgress→Done without an intermediate commit + await tx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await tx.run( + UPDATE.entity("sap.eventqueue.Event") + .set({ status: EventProcessingStatus.Done }) + .where({ status: EventProcessingStatus.InProgress }) + ); + await commitAndOpenNew(); + + // Net delta: pending -2, inProgress +2 then -2 → both counters at 0 + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + expect(loggerMock.callsLengths().error).toBe(0); + }); + + it("does not adjust counters when transaction is rolled back", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await service.send("main", {}); + await commitAndOpenNew(); // pending=2 + + const innerTx = cds.tx(context); + await innerTx.run(UPDATE.entity("sap.eventqueue.Event").set({ status: EventProcessingStatus.InProgress })); + await innerTx.rollback(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(2); + expect(tenantStats[StatusField.InProgress]).toBe(0); + }); + }); + + describe("processEventQueue integration — stats via real processing", () => { + it("successful processing transitions pending → inProgress → Done (counters reach zero)", async () => { + const service = (await cds.connect.to("StandardService")).tx(context); + await service.send("main", {}); + await commitAndOpenNew(); + + expect((await getTenantStats(123))[StatusField.Pending]).toBe(1); + expect((await getTenantStats(123))[StatusField.InProgress]).toBe(0); + + await processEventQueue(tx.context, "CAP_OUTBOX", "StandardService"); + await commitAndOpenNew(); + + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(0); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(0); + expect(globalStats[StatusField.InProgress]).toBe(0); + + await testHelper.selectEventQueueAndExpectDone(tx, { expectedLength: 1 }); + }); + + it("failed processing transitions pending → inProgress → Error → back to pending", async () => { + const service = cds.outboxed(await cds.connect.to("NotificationService")).tx(context); + await service.send("errorEvent", { to: "to", subject: "subject", body: "body" }); + await commitAndOpenNew(); + + expect((await getTenantStats(123))[StatusField.Pending]).toBe(1); + expect((await getTenantStats(123))[StatusField.InProgress]).toBe(0); + + await processEventQueue(tx.context, "CAP_OUTBOX", "NotificationService"); + await commitAndOpenNew(); + + // Error state means the event will be retried → counts as pending + const tenantStats = await getTenantStats(123); + expect(tenantStats[StatusField.Pending]).toBe(1); + expect(tenantStats[StatusField.InProgress]).toBe(0); + + const globalStats = await getGlobalStats(); + expect(globalStats[StatusField.Pending]).toBe(1); + expect(globalStats[StatusField.InProgress]).toBe(0); + + await testHelper.selectEventQueueAndExpectError(tx, { expectedLength: 1 }); + }); + }); + + const commitAndOpenNew = async () => { + await tx.commit(); + context = new cds.EventContext({ user: "testUser", tenant: 123 }); + tx = cds.tx(context); + }; +}); diff --git a/test/eventQueueStats.test.js b/test/eventQueueStats.test.js new file mode 100644 index 00000000..0e908046 --- /dev/null +++ b/test/eventQueueStats.test.js @@ -0,0 +1,185 @@ +"use strict"; + +const path = require("path"); + +const cds = require("@sap/cds/lib"); + +const mockRedis = require("./mocks/redisMock"); +jest.mock("../src/shared/redis", () => mockRedis); + +const eventQueue = require("../src"); +const { + StatusField, + incrementCounters, + decrementCounters, + adjustTenantCounter, + adjustGlobalCounter, + getTenantStats, + getGlobalStats, + deleteTenantStats, + resetInProgressCounters, +} = require("../src/shared/eventQueueStats"); + +const project = __dirname + "/.."; +cds.test(project); + +describe("eventQueueStats", () => { + beforeAll(async () => { + const configFilePath = path.join(__dirname, "asset", "config.yml"); + await eventQueue.initialize({ + configFilePath, + processEventsAfterPublish: false, + registerAsEventProcessor: false, + }); + }); + + beforeEach(() => { + mockRedis.clearState(); + }); + + afterAll(() => cds.shutdown); + + describe("incrementCounters / decrementCounters", () => { + it("increments tenant and global counters", async () => { + await incrementCounters("t1", StatusField.Pending, 3); + + const tenant = await getTenantStats("t1"); + expect(tenant.pending).toBe(3); + expect(tenant.inProgress).toBe(0); + + const global = await getGlobalStats(); + expect(global.pending).toBe(3); + }); + + it("decrements tenant and global counters", async () => { + await incrementCounters("t1", StatusField.Pending, 5); + await decrementCounters("t1", StatusField.Pending, 2); + + const tenant = await getTenantStats("t1"); + expect(tenant.pending).toBe(3); + + const global = await getGlobalStats(); + expect(global.pending).toBe(3); + }); + + it("multiple tenants are tracked independently", async () => { + await incrementCounters("t1", StatusField.Pending, 2); + await incrementCounters("t2", StatusField.Pending, 5); + + expect((await getTenantStats("t1")).pending).toBe(2); + expect((await getTenantStats("t2")).pending).toBe(5); + }); + + it("global counter aggregates across all tenants", async () => { + await incrementCounters("t1", StatusField.InProgress, 1); + await incrementCounters("t2", StatusField.InProgress, 4); + + const global = await getGlobalStats(); + expect(global.inProgress).toBe(5); + }); + }); + + describe("adjustTenantCounter", () => { + it("creates hash with zero base when first incremented", async () => { + await adjustTenantCounter("t1", StatusField.InProgress, 1); + + const stats = await getTenantStats("t1"); + expect(stats.inProgress).toBe(1); + expect(stats.pending).toBe(0); + }); + + it("supports negative increments", async () => { + await adjustTenantCounter("t1", StatusField.Pending, 10); + await adjustTenantCounter("t1", StatusField.Pending, -3); + + expect((await getTenantStats("t1")).pending).toBe(7); + }); + }); + + describe("adjustGlobalCounter", () => { + it("increments the global counter for the given field", async () => { + await adjustGlobalCounter(StatusField.Pending, 7); + + const global = await getGlobalStats(); + expect(global.pending).toBe(7); + }); + }); + + describe("getTenantStats", () => { + it("returns all-zero object for unknown tenant", async () => { + const stats = await getTenantStats("unknown-tenant"); + expect(stats).toEqual({ pending: 0, inProgress: 0 }); + }); + }); + + describe("getGlobalStats", () => { + it("returns all-zero object when no data exists", async () => { + const stats = await getGlobalStats(); + expect(stats).toEqual({ pending: 0, inProgress: 0 }); + }); + }); + + describe("deleteTenantStats", () => { + it("removes the tenant hash", async () => { + await incrementCounters("t1", StatusField.Pending, 5); + await deleteTenantStats("t1"); + + const stats = await getTenantStats("t1"); + expect(stats).toEqual({ pending: 0, inProgress: 0 }); + }); + + it("does not throw when tenant does not exist", async () => { + await expect(deleteTenantStats("nonexistent")).resolves.toBeUndefined(); + }); + }); + + describe("resetInProgressCounters", () => { + it("resets global inProgress to 0 for all configured namespaces", async () => { + await incrementCounters("t1", StatusField.InProgress, 5); + + await resetInProgressCounters(); + + const global = await getGlobalStats(); + expect(global.inProgress).toBe(0); + }); + + it("does not touch the pending counter", async () => { + await incrementCounters("t1", StatusField.Pending, 3); + await incrementCounters("t1", StatusField.InProgress, 2); + + await resetInProgressCounters(); + + const global = await getGlobalStats(); + expect(global.pending).toBe(3); + expect(global.inProgress).toBe(0); + }); + + it("resets inProgress in tenant hash keys found via scan", async () => { + await incrementCounters("t1", StatusField.InProgress, 4); + await incrementCounters("t2", StatusField.InProgress, 2); + + await resetInProgressCounters(); + + const t1 = await getTenantStats("t1"); + expect(t1.inProgress).toBe(0); + + const t2 = await getTenantStats("t2"); + expect(t2.inProgress).toBe(0); + }); + + it("preserves tenant pending counter after reset", async () => { + await incrementCounters("t1", StatusField.Pending, 7); + await incrementCounters("t1", StatusField.InProgress, 3); + + await resetInProgressCounters(); + + const t1 = await getTenantStats("t1"); + expect(t1.pending).toBe(7); + expect(t1.inProgress).toBe(0); + }); + + it("resolves without error when no keys exist", async () => { + await expect(resetInProgressCounters()).resolves.toBeUndefined(); + }); + }); +}); diff --git a/test/mocks/redisMock.js b/test/mocks/redisMock.js index a0f03202..1465ebcb 100644 --- a/test/mocks/redisMock.js +++ b/test/mocks/redisMock.js @@ -2,7 +2,8 @@ let state = {}; let testState = {}; -const _createMainClientAndConnect = async () => ({ + +const _buildClient = () => ({ get: async (key) => state[key]?.value ?? null, exists: async (key) => Object.prototype.hasOwnProperty.call(state, key), set: async (key, value, options) => { @@ -13,15 +14,92 @@ const _createMainClientAndConnect = async () => ({ testState[key] = { value, options }; return "OK"; }, - del: async (key) => delete state[key], + del: async (key) => { + const existed = Object.prototype.hasOwnProperty.call(state, key); + delete state[key]; + return existed ? 1 : 0; + }, + hIncrBy: async (key, field, increment) => { + if (!state[key]) { + state[key] = { hash: {} }; + } + const current = parseInt(state[key].hash[field] ?? "0", 10); + state[key].hash[field] = String(current + increment); + return current + increment; + }, + hSet: async (key, field, value) => { + if (!state[key]) { + state[key] = { hash: {} }; + } + const isNew = !Object.prototype.hasOwnProperty.call(state[key].hash, field); + state[key].hash[field] = String(value); + return isNew ? 1 : 0; + }, + hGetAll: async (key) => { + return state[key]?.hash ?? {}; + }, + scanIterator: ({ MATCH } = {}) => { + const regex = MATCH + ? new RegExp( + "^" + + MATCH.replace(/[.+^${}()|[\]\\]/g, "\\$&") + .replace(/\*/g, ".*") + .replace(/\?/g, ".") + + "$" + ) + : null; + const matchingKeys = Object.keys(state).filter((k) => !regex || regex.test(k)); + return (async function* () { + for (const key of matchingKeys) { + yield key; + } + })(); + }, + multi: () => { + const ops = []; + const pipeline = { + hIncrBy: (key, field, increment) => { + ops.push(async () => { + if (!state[key]) { + state[key] = { hash: {} }; + } + const current = parseInt(state[key].hash[field] ?? "0", 10); + state[key].hash[field] = String(current + increment); + return current + increment; + }); + return pipeline; + }, + hSet: (key, field, value) => { + ops.push(async () => { + if (!state[key]) { + state[key] = { hash: {} }; + } + state[key].hash[field] = String(value); + }); + return pipeline; + }, + exec: async () => { + const results = []; + for (const op of ops) { + results.push(await op()); + } + return results; + }, + }; + return pipeline; + }, _: { state, }, }); +const _createMainClientAndConnect = async () => _buildClient(); + module.exports = { attachRedisUnsubscribeHandler: () => {}, subscribeRedisChannel: () => {}, + publishMessage: async () => {}, + isClusterMode: () => false, createClientAndConnect: _createMainClientAndConnect, createMainClientAndConnect: _createMainClientAndConnect, closeSubscribeClient: () => {}, diff --git a/test/redisPubSub.test.js b/test/redisPubSub.test.js index 37fcd9ed..24df9362 100644 --- a/test/redisPubSub.test.js +++ b/test/redisPubSub.test.js @@ -10,6 +10,7 @@ const setTimeoutSpy = jest.spyOn(global, "setTimeout").mockImplementation((first }); const distributedLock = require("../src/shared/distributedLock"); +const eventQueueStats = require("../src/shared/eventQueueStats"); const checkLockExistsSpy = jest.spyOn(distributedLock, "checkLockExists"); const config = require("../src/config"); const redisPub = require("../src/redis/redisPub"); @@ -67,6 +68,7 @@ describe("eventQueue Redis Events and DB Handlers", () => { eventQueue.registerEventQueueDbHandler(cds.db); loggerMock = mockLogger(); jest.spyOn(cds.utils, "uuid").mockReturnValue("6e31047a-d2b5-4e3c-83d8-deab20165956"); + jest.spyOn(eventQueueStats, "incrementCounters").mockResolvedValue(); }); beforeEach(async () => { diff --git a/tmp/newTask.txt b/tmp/newTask.txt new file mode 100644 index 00000000..36e16d3b --- /dev/null +++ b/tmp/newTask.txt @@ -0,0 +1,2 @@ +initialize only if instance number is zero. +initzialize only if is eventprocessor make sure app only reports for its processing namespace \ No newline at end of file