From 37d33afea311255c36aa4cf9748629952de51664 Mon Sep 17 00:00:00 2001 From: Samuel Lukes Date: Mon, 17 Nov 2025 22:09:33 +0100 Subject: [PATCH] fix(kafkajs): add APM tracing support for batch consumer The batch consumer plugin was only handling Data Streams Monitoring (DSM) but not creating APM trace spans. This adds the missing bindStart() method to create spans for eachBatch callbacks, similar to how the regular consumer plugin creates spans for eachMessage callbacks. Changes: - Add bindStart() method to create APM trace spans for batch consumption - Extract parent context from first message headers for distributed tracing - Add span tags: kafka.topic, kafka.partition, kafka.batch_size - Add kafka.first_offset and kafka.last_offset tags for batch boundaries - Set span resource to topic name and type to 'worker' - Integrate DSM checkpoints with APM spans (pass span to setCheckpoint) - Add comprehensive test coverage for batch consumer APM tracing This fix ensures that batch consumer operations are properly traced in APM, providing visibility into batch consumption performance and enabling distributed tracing across Kafka producers and consumers. Fixes missing APM spans for @confluentinc/kafka-javascript and kafkajs batch consumers when using eachBatch callbacks. --- .../src/batch-consumer.js | 66 ++++++++-- .../datadog-plugin-kafkajs/test/index.spec.js | 115 ++++++++++++++++++ 2 files changed, 169 insertions(+), 12 deletions(-) diff --git a/packages/datadog-plugin-kafkajs/src/batch-consumer.js b/packages/datadog-plugin-kafkajs/src/batch-consumer.js index 7cc07bbac12..ab1e9f2fc77 100644 --- a/packages/datadog-plugin-kafkajs/src/batch-consumer.js +++ b/packages/datadog-plugin-kafkajs/src/batch-consumer.js @@ -4,24 +4,66 @@ const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') const { getMessageSize } = require('../../dd-trace/src/datastreams') const { convertToTextMap } = require('./utils') +const MESSAGING_DESTINATION_KEY = 'messaging.destination.name' + class KafkajsBatchConsumerPlugin extends ConsumerPlugin { static id = 'kafkajs' static operation = 'consume-batch' - start (ctx) { - const { topic, messages, groupId, clusterId } = ctx.extractedArgs || ctx - - if (!this.config.dsmEnabled) return - for (const message of messages) { - if (!message || !message.headers) continue - const payloadSize = getMessageSize(message) - this.tracer.decodeDataStreamsContext(convertToTextMap(message.headers)) - const edgeTags = ['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka'] - if (clusterId) { - edgeTags.push(`kafka_cluster_id:${clusterId}`) + bindStart (ctx) { + const { topic, partition, messages, groupId, clusterId } = ctx.extractedArgs || ctx + + // Extract parent context from first message headers if available + let childOf + if (messages && messages.length > 0 && messages[0]?.headers) { + const headers = convertToTextMap(messages[0].headers) + if (headers) { + childOf = this.tracer.extract('text_map', headers) + } + } + + const span = this.startSpan({ + childOf, + resource: topic, + type: 'worker', + meta: { + component: this.constructor.id, + 'kafka.topic': topic, + 'kafka.cluster_id': clusterId, + [MESSAGING_DESTINATION_KEY]: topic + }, + metrics: { + 'kafka.partition': partition, + 'kafka.batch_size': messages ? messages.length : 0 + } + }, ctx) + + // Add offset tags from first and last message in batch + if (messages && messages.length > 0) { + if (messages[0]?.offset) { + span.setTag('kafka.first_offset', messages[0].offset) + } + if (messages[messages.length - 1]?.offset) { + span.setTag('kafka.last_offset', messages[messages.length - 1].offset) + } + } + + // Data Streams Monitoring: process each message in the batch + if (this.config.dsmEnabled) { + for (const message of messages) { + if (!message || !message.headers) continue + const headers = convertToTextMap(message.headers) + const payloadSize = getMessageSize(message) + this.tracer.decodeDataStreamsContext(headers) + const edgeTags = ['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka'] + if (clusterId) { + edgeTags.push(`kafka_cluster_id:${clusterId}`) + } + this.tracer.setCheckpoint(edgeTags, span, payloadSize) } - this.tracer.setCheckpoint(edgeTags, null, payloadSize) } + + return ctx.currentStore } } diff --git a/packages/datadog-plugin-kafkajs/test/index.spec.js b/packages/datadog-plugin-kafkajs/test/index.spec.js index aeee5ebbc57..dc2b1215d6f 100644 --- a/packages/datadog-plugin-kafkajs/test/index.spec.js +++ b/packages/datadog-plugin-kafkajs/test/index.spec.js @@ -434,6 +434,121 @@ describe('Plugin', () => { ) }) + describe('consumer (eachBatch)', () => { + let consumer + + beforeEach(async () => { + consumer = kafka.consumer({ groupId: 'test-group' }) + await consumer.connect() + await consumer.subscribe({ topic: testTopic }) + }) + + afterEach(async () => { + await consumer.disconnect() + }) + + it('should be instrumented', async () => { + const expectedSpanPromise = expectSpanWithDefaults({ + name: 'kafka.consume-batch', + service: 'test-kafka', + meta: { + 'span.kind': 'consumer', + component: 'kafkajs', + 'messaging.destination.name': testTopic + }, + resource: testTopic, + error: 0, + type: 'worker' + }) + + await consumer.run({ + eachBatch: async () => {} + }) + await sendMessages(kafka, testTopic, messages) + return expectedSpanPromise + }) + + it('should include batch size in metrics', async () => { + const expectedSpanPromise = agent.assertSomeTraces(traces => { + const span = traces[0][0] + expect(span).to.include({ + name: 'kafka.consume-batch', + resource: testTopic + }) + expect(span.metrics['kafka.batch_size']).to.equal(messages.length) + }) + + await consumer.run({ + eachBatch: async () => {} + }) + await sendMessages(kafka, testTopic, messages) + return expectedSpanPromise + }) + + it('should include first and last offset tags', async () => { + const expectedSpanPromise = agent.assertSomeTraces(traces => { + const span = traces[0][0] + expect(span.meta['kafka.first_offset']).to.exist + expect(span.meta['kafka.last_offset']).to.exist + }) + + await consumer.run({ + eachBatch: async () => {} + }) + await sendMessages(kafka, testTopic, messages) + return expectedSpanPromise + }) + + it('should run the consumer in the context of the consumer span', done => { + const firstSpan = tracer.scope().active() + + let eachBatch = async ({ batch }) => { + const currentSpan = tracer.scope().active() + + try { + expect(currentSpan).to.not.equal(firstSpan) + expect(currentSpan.context()._name).to.equal('kafka.consume-batch') + done() + } catch (e) { + done(e) + } finally { + eachBatch = () => {} // avoid being called for each message + } + } + + consumer.run({ eachBatch: (...args) => eachBatch(...args) }) + .then(() => sendMessages(kafka, testTopic, messages)) + .catch(done) + }) + + it('should be instrumented w/ error', async () => { + const fakeError = new Error('Oh No!') + const expectedSpanPromise = expectSpanWithDefaults({ + name: 'kafka.consume-batch', + service: 'test-kafka', + meta: { + [ERROR_TYPE]: fakeError.name, + [ERROR_MESSAGE]: fakeError.message, + [ERROR_STACK]: fakeError.stack, + component: 'kafkajs' + }, + resource: testTopic, + error: 1, + type: 'worker' + }) + + await consumer.subscribe({ topic: testTopic, fromBeginning: true }) + await consumer.run({ + eachBatch: async () => { + throw fakeError + } + }) + await sendMessages(kafka, testTopic, messages) + + return expectedSpanPromise + }) + }) + describe('data stream monitoring', () => { let consumer