Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 54 additions & 12 deletions packages/datadog-plugin-kafkajs/src/batch-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,66 @@ const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')
const { getMessageSize } = require('../../dd-trace/src/datastreams')
const { convertToTextMap } = require('./utils')

const MESSAGING_DESTINATION_KEY = 'messaging.destination.name'

class KafkajsBatchConsumerPlugin extends ConsumerPlugin {
static id = 'kafkajs'
static operation = 'consume-batch'

start (ctx) {
const { topic, messages, groupId, clusterId } = ctx.extractedArgs || ctx

if (!this.config.dsmEnabled) return
for (const message of messages) {
if (!message || !message.headers) continue
const payloadSize = getMessageSize(message)
this.tracer.decodeDataStreamsContext(convertToTextMap(message.headers))
const edgeTags = ['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka']
if (clusterId) {
edgeTags.push(`kafka_cluster_id:${clusterId}`)
bindStart (ctx) {
const { topic, partition, messages, groupId, clusterId } = ctx.extractedArgs || ctx

// Extract parent context from first message headers if available
let childOf
if (messages && messages.length > 0 && messages[0]?.headers) {
const headers = convertToTextMap(messages[0].headers)
if (headers) {
childOf = this.tracer.extract('text_map', headers)
}
}

const span = this.startSpan({
childOf,
resource: topic,
type: 'worker',
meta: {
component: this.constructor.id,
'kafka.topic': topic,
'kafka.cluster_id': clusterId,
[MESSAGING_DESTINATION_KEY]: topic
},
metrics: {
'kafka.partition': partition,
'kafka.batch_size': messages ? messages.length : 0
}
}, ctx)

// Add offset tags from first and last message in batch
if (messages && messages.length > 0) {
if (messages[0]?.offset) {
span.setTag('kafka.first_offset', messages[0].offset)
}
if (messages[messages.length - 1]?.offset) {
span.setTag('kafka.last_offset', messages[messages.length - 1].offset)
}
}

// Data Streams Monitoring: process each message in the batch
if (this.config.dsmEnabled) {
for (const message of messages) {
if (!message || !message.headers) continue
const headers = convertToTextMap(message.headers)
const payloadSize = getMessageSize(message)
this.tracer.decodeDataStreamsContext(headers)
const edgeTags = ['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka']
if (clusterId) {
edgeTags.push(`kafka_cluster_id:${clusterId}`)
}
this.tracer.setCheckpoint(edgeTags, span, payloadSize)
}
this.tracer.setCheckpoint(edgeTags, null, payloadSize)
}

return ctx.currentStore
}
}

Expand Down
115 changes: 115 additions & 0 deletions packages/datadog-plugin-kafkajs/test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,121 @@ describe('Plugin', () => {
)
})

describe('consumer (eachBatch)', () => {
let consumer

beforeEach(async () => {
consumer = kafka.consumer({ groupId: 'test-group' })
await consumer.connect()
await consumer.subscribe({ topic: testTopic })
})

afterEach(async () => {
await consumer.disconnect()
})

it('should be instrumented', async () => {
const expectedSpanPromise = expectSpanWithDefaults({
name: 'kafka.consume-batch',
service: 'test-kafka',
meta: {
'span.kind': 'consumer',
component: 'kafkajs',
'messaging.destination.name': testTopic
},
resource: testTopic,
error: 0,
type: 'worker'
})

await consumer.run({
eachBatch: async () => {}
})
await sendMessages(kafka, testTopic, messages)
return expectedSpanPromise
})

it('should include batch size in metrics', async () => {
const expectedSpanPromise = agent.assertSomeTraces(traces => {
const span = traces[0][0]
expect(span).to.include({
name: 'kafka.consume-batch',
resource: testTopic
})
expect(span.metrics['kafka.batch_size']).to.equal(messages.length)
})

await consumer.run({
eachBatch: async () => {}
})
await sendMessages(kafka, testTopic, messages)
return expectedSpanPromise
})

it('should include first and last offset tags', async () => {
const expectedSpanPromise = agent.assertSomeTraces(traces => {
const span = traces[0][0]
expect(span.meta['kafka.first_offset']).to.exist
expect(span.meta['kafka.last_offset']).to.exist
})

await consumer.run({
eachBatch: async () => {}
})
await sendMessages(kafka, testTopic, messages)
return expectedSpanPromise
})

it('should run the consumer in the context of the consumer span', done => {
const firstSpan = tracer.scope().active()

let eachBatch = async ({ batch }) => {
const currentSpan = tracer.scope().active()

try {
expect(currentSpan).to.not.equal(firstSpan)
expect(currentSpan.context()._name).to.equal('kafka.consume-batch')
done()
} catch (e) {
done(e)
} finally {
eachBatch = () => {} // avoid being called for each message
}
}

consumer.run({ eachBatch: (...args) => eachBatch(...args) })
.then(() => sendMessages(kafka, testTopic, messages))
.catch(done)
})

it('should be instrumented w/ error', async () => {
const fakeError = new Error('Oh No!')
const expectedSpanPromise = expectSpanWithDefaults({
name: 'kafka.consume-batch',
service: 'test-kafka',
meta: {
[ERROR_TYPE]: fakeError.name,
[ERROR_MESSAGE]: fakeError.message,
[ERROR_STACK]: fakeError.stack,
component: 'kafkajs'
},
resource: testTopic,
error: 1,
type: 'worker'
})

await consumer.subscribe({ topic: testTopic, fromBeginning: true })
await consumer.run({
eachBatch: async () => {
throw fakeError
}
})
await sendMessages(kafka, testTopic, messages)

return expectedSpanPromise
})
})

describe('data stream monitoring', () => {
let consumer

Expand Down
Loading