diff --git a/packages/datadog-plugin-kafkajs/src/batch-consumer.js b/packages/datadog-plugin-kafkajs/src/batch-consumer.js index 7cc07bbac12..ab1e9f2fc77 100644 --- a/packages/datadog-plugin-kafkajs/src/batch-consumer.js +++ b/packages/datadog-plugin-kafkajs/src/batch-consumer.js @@ -4,24 +4,66 @@ const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') const { getMessageSize } = require('../../dd-trace/src/datastreams') const { convertToTextMap } = require('./utils') +const MESSAGING_DESTINATION_KEY = 'messaging.destination.name' + class KafkajsBatchConsumerPlugin extends ConsumerPlugin { static id = 'kafkajs' static operation = 'consume-batch' - start (ctx) { - const { topic, messages, groupId, clusterId } = ctx.extractedArgs || ctx - - if (!this.config.dsmEnabled) return - for (const message of messages) { - if (!message || !message.headers) continue - const payloadSize = getMessageSize(message) - this.tracer.decodeDataStreamsContext(convertToTextMap(message.headers)) - const edgeTags = ['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka'] - if (clusterId) { - edgeTags.push(`kafka_cluster_id:${clusterId}`) + bindStart (ctx) { + const { topic, partition, messages, groupId, clusterId } = ctx.extractedArgs || ctx + + // Extract parent context from first message headers if available + let childOf + if (messages && messages.length > 0 && messages[0]?.headers) { + const headers = convertToTextMap(messages[0].headers) + if (headers) { + childOf = this.tracer.extract('text_map', headers) + } + } + + const span = this.startSpan({ + childOf, + resource: topic, + type: 'worker', + meta: { + component: this.constructor.id, + 'kafka.topic': topic, + 'kafka.cluster_id': clusterId, + [MESSAGING_DESTINATION_KEY]: topic + }, + metrics: { + 'kafka.partition': partition, + 'kafka.batch_size': messages ? messages.length : 0 + } + }, ctx) + + // Add offset tags from first and last message in batch + if (messages && messages.length > 0) { + if (messages[0]?.offset) { + span.setTag('kafka.first_offset', messages[0].offset) + } + if (messages[messages.length - 1]?.offset) { + span.setTag('kafka.last_offset', messages[messages.length - 1].offset) + } + } + + // Data Streams Monitoring: process each message in the batch + if (this.config.dsmEnabled) { + for (const message of messages) { + if (!message || !message.headers) continue + const headers = convertToTextMap(message.headers) + const payloadSize = getMessageSize(message) + this.tracer.decodeDataStreamsContext(headers) + const edgeTags = ['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka'] + if (clusterId) { + edgeTags.push(`kafka_cluster_id:${clusterId}`) + } + this.tracer.setCheckpoint(edgeTags, span, payloadSize) } - this.tracer.setCheckpoint(edgeTags, null, payloadSize) } + + return ctx.currentStore } } diff --git a/packages/datadog-plugin-kafkajs/test/index.spec.js b/packages/datadog-plugin-kafkajs/test/index.spec.js index aeee5ebbc57..dc2b1215d6f 100644 --- a/packages/datadog-plugin-kafkajs/test/index.spec.js +++ b/packages/datadog-plugin-kafkajs/test/index.spec.js @@ -434,6 +434,121 @@ describe('Plugin', () => { ) }) + describe('consumer (eachBatch)', () => { + let consumer + + beforeEach(async () => { + consumer = kafka.consumer({ groupId: 'test-group' }) + await consumer.connect() + await consumer.subscribe({ topic: testTopic }) + }) + + afterEach(async () => { + await consumer.disconnect() + }) + + it('should be instrumented', async () => { + const expectedSpanPromise = expectSpanWithDefaults({ + name: 'kafka.consume-batch', + service: 'test-kafka', + meta: { + 'span.kind': 'consumer', + component: 'kafkajs', + 'messaging.destination.name': testTopic + }, + resource: testTopic, + error: 0, + type: 'worker' + }) + + await consumer.run({ + eachBatch: async () => {} + }) + await sendMessages(kafka, testTopic, messages) + return expectedSpanPromise + }) + + it('should include batch size in metrics', async () => { + const expectedSpanPromise = agent.assertSomeTraces(traces => { + const span = traces[0][0] + expect(span).to.include({ + name: 'kafka.consume-batch', + resource: testTopic + }) + expect(span.metrics['kafka.batch_size']).to.equal(messages.length) + }) + + await consumer.run({ + eachBatch: async () => {} + }) + await sendMessages(kafka, testTopic, messages) + return expectedSpanPromise + }) + + it('should include first and last offset tags', async () => { + const expectedSpanPromise = agent.assertSomeTraces(traces => { + const span = traces[0][0] + expect(span.meta['kafka.first_offset']).to.exist + expect(span.meta['kafka.last_offset']).to.exist + }) + + await consumer.run({ + eachBatch: async () => {} + }) + await sendMessages(kafka, testTopic, messages) + return expectedSpanPromise + }) + + it('should run the consumer in the context of the consumer span', done => { + const firstSpan = tracer.scope().active() + + let eachBatch = async ({ batch }) => { + const currentSpan = tracer.scope().active() + + try { + expect(currentSpan).to.not.equal(firstSpan) + expect(currentSpan.context()._name).to.equal('kafka.consume-batch') + done() + } catch (e) { + done(e) + } finally { + eachBatch = () => {} // avoid being called for each message + } + } + + consumer.run({ eachBatch: (...args) => eachBatch(...args) }) + .then(() => sendMessages(kafka, testTopic, messages)) + .catch(done) + }) + + it('should be instrumented w/ error', async () => { + const fakeError = new Error('Oh No!') + const expectedSpanPromise = expectSpanWithDefaults({ + name: 'kafka.consume-batch', + service: 'test-kafka', + meta: { + [ERROR_TYPE]: fakeError.name, + [ERROR_MESSAGE]: fakeError.message, + [ERROR_STACK]: fakeError.stack, + component: 'kafkajs' + }, + resource: testTopic, + error: 1, + type: 'worker' + }) + + await consumer.subscribe({ topic: testTopic, fromBeginning: true }) + await consumer.run({ + eachBatch: async () => { + throw fakeError + } + }) + await sendMessages(kafka, testTopic, messages) + + return expectedSpanPromise + }) + }) + describe('data stream monitoring', () => { let consumer