Skip to content
Open
37 changes: 20 additions & 17 deletions packages/datadog-instrumentations/src/kafkajs.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,6 @@ const batchConsumerErrorCh = channel('apm:kafkajs:consume-batch:error')

const disabledHeaderWeakSet = new WeakSet()

function commitsFromEvent (event) {
const { payload: { groupId, topics } } = event
const commitList = []
for (const { topic, partitions } of topics) {
for (const { partition, offset } of partitions) {
commitList.push({
groupId,
partition,
offset,
topic,
})
}
}
consumerCommitCh.publish(commitList)
}

addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKafka) => {
class Kafka extends BaseKafka {
constructor (options) {
Expand Down Expand Up @@ -132,6 +116,7 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf
}

const kafkaClusterIdPromise = getKafkaClusterId(this)
let resolvedClusterId = null

const eachMessageExtractor = (args, clusterId) => {
const { topic, partition, message } = args[0]
Expand All @@ -146,13 +131,31 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf

const consumer = createConsumer.apply(this, arguments)

consumer.on(consumer.events.COMMIT_OFFSETS, commitsFromEvent)
consumer.on(consumer.events.COMMIT_OFFSETS, (event) => {
const { payload: { groupId: commitGroupId, topics } } = event
const commitList = []
for (const { topic, partitions } of topics) {
for (const { partition, offset } of partitions) {
commitList.push({
groupId: commitGroupId,
partition,
offset,
topic,
clusterId: resolvedClusterId,
})
}
}
consumerCommitCh.publish(commitList)
})

const run = consumer.run
const groupId = arguments[0].groupId

consumer.run = function ({ eachMessage, eachBatch, ...runArgs }) {
const wrapConsume = (clusterId) => {
// In kafkajs COMMIT_OFFSETS always happens in the context of one synchronous run
// So this will always reference a correct cluster id
resolvedClusterId = clusterId
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This relies on an assumption of an implementation detail to work, which is that COMMIT_OFFSETS will always happen in the context of a run, synchronously, and that no 2 runs can run concurrently. Have you validated that this assumption is correct? If yes, then I would add a comment clarifying that as it's critical for this to work properly and for future readers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. I checked that's the case and added a comment
There should be only one run (here and the COMMIT_OFFSETS only happens in the offsetManager which is only used within the context of run.

return run({
eachMessage: wrappedCallback(
eachMessage,
Expand Down
8 changes: 6 additions & 2 deletions packages/datadog-plugin-kafkajs/src/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,18 @@ class KafkajsConsumerPlugin extends ConsumerPlugin {
* @returns {ConsumerBacklog}
*/
transformCommit (commit) {
const { groupId, partition, offset, topic } = commit
return {
const { groupId, partition, offset, topic, clusterId } = commit
const backlog = {
partition,
topic,
type: 'kafka_commit',
offset: Number(offset),
consumer_group: groupId,
}
if (clusterId) {
backlog.kafka_cluster_id = clusterId
}
return backlog
}

commit (commitList) {
Expand Down
11 changes: 8 additions & 3 deletions packages/datadog-plugin-kafkajs/src/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,20 @@ class KafkajsProducerPlugin extends ProducerPlugin {
* @param {ProducerResponseItem} response
* @returns {ProducerBacklog}
*/
transformProduceResponse (response) {
transformProduceResponse (response, clusterId) {
// In produce protocol >=v3, the offset key changes from `offset` to `baseOffset`
const { topicName: topic, partition, offset, baseOffset } = response
const offsetAsLong = offset || baseOffset
return {
const backlog = {
type: 'kafka_produce',
partition,
offset: offsetAsLong ? Number(offsetAsLong) : undefined,
topic,
}
if (clusterId) {
backlog.kafka_cluster_id = clusterId
}
return backlog
}

/**
Expand All @@ -56,6 +60,7 @@ class KafkajsProducerPlugin extends ProducerPlugin {
*/
commit (ctx) {
const commitList = ctx.result
const clusterId = ctx.clusterId

if (!this.config.dsmEnabled) return
if (!commitList || !Array.isArray(commitList)) return
Expand All @@ -65,7 +70,7 @@ class KafkajsProducerPlugin extends ProducerPlugin {
'offset',
'topic',
]
for (const commit of commitList.map(this.transformProduceResponse)) {
for (const commit of commitList.map(r => this.transformProduceResponse(r, clusterId))) {
if (keys.some(key => !commit.hasOwnProperty(key))) continue
this.tracer.setOffset(commit)
}
Expand Down
17 changes: 13 additions & 4 deletions packages/datadog-plugin-kafkajs/test/dsm.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -219,18 +219,27 @@ describe('Plugin', () => {
assert.strictEqual(runArg?.offset, commitMeta.offset)
assert.strictEqual(runArg?.partition, commitMeta.partition)
assert.strictEqual(runArg?.topic, commitMeta.topic)
assertObjectContains(runArg, {
const expectedBacklog = {
type: 'kafka_commit',
consumer_group: 'test-group',
})
}
// kafka_cluster_id is only available in kafkajs >=1.13
if (clusterIdAvailable) {
expectedBacklog.kafka_cluster_id = testKafkaClusterId
}
assertObjectContains(runArg, expectedBacklog)
})
}

it('Should add backlog on producer response', async () => {
await sendMessages(kafka, testTopic, messages)
sinon.assert.calledOnce(setOffsetSpy)
const { topic } = setOffsetSpy.lastCall.args[0]
assert.strictEqual(topic, testTopic)
const runArg = setOffsetSpy.lastCall.args[0]
assert.strictEqual(runArg.topic, testTopic)
// kafka_cluster_id is only available in kafkajs >=1.13
if (clusterIdAvailable) {
assert.strictEqual(runArg.kafka_cluster_id, testKafkaClusterId)
}
})
})
})
Expand Down
Loading