From 2ac6efe2aff27f2283c02c08c0f88fac0ceb2968 Mon Sep 17 00:00:00 2001 From: Rob Carlan Date: Wed, 18 Feb 2026 15:01:54 -0500 Subject: [PATCH 1/4] fix(kafkajs): include kafka_cluster_id in DSM backlog offset tracking DSM checkpoints correctly included kafka_cluster_id in edge tags, but the backlog/offset tracking (which feeds lag metrics like data_streams.kafka.lag_messages and data_streams.kafka.lag_seconds) did not. This caused cross-cluster offset mixing when the same topic exists on multiple Kafka clusters, producing wildly incorrect lag values. Thread clusterId through to setOffset calls for both producer and consumer commit paths so that backlog entries are scoped per cluster. DSMON-1226 Co-Authored-By: Claude Opus 4.6 --- .../datadog-instrumentations/src/kafkajs.js | 35 ++++++++++--------- .../datadog-plugin-kafkajs/src/consumer.js | 8 +++-- .../datadog-plugin-kafkajs/src/producer.js | 11 ++++-- .../datadog-plugin-kafkajs/test/dsm.spec.js | 15 +++++--- 4 files changed, 43 insertions(+), 26 deletions(-) diff --git a/packages/datadog-instrumentations/src/kafkajs.js b/packages/datadog-instrumentations/src/kafkajs.js index 57740c8a03c..e32fcb07dc3 100644 --- a/packages/datadog-instrumentations/src/kafkajs.js +++ b/packages/datadog-instrumentations/src/kafkajs.js @@ -24,22 +24,6 @@ const batchConsumerErrorCh = channel('apm:kafkajs:consume-batch:error') const disabledHeaderWeakSet = new WeakSet() -function commitsFromEvent (event) { - const { payload: { groupId, topics } } = event - const commitList = [] - for (const { topic, partitions } of topics) { - for (const { partition, offset } of partitions) { - commitList.push({ - groupId, - partition, - offset, - topic, - }) - } - } - consumerCommitCh.publish(commitList) -} - addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKafka) => { class Kafka extends BaseKafka { constructor (options) { @@ -132,6 +116,7 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf } const kafkaClusterIdPromise = getKafkaClusterId(this) + let resolvedClusterId = null const eachMessageExtractor = (args, clusterId) => { const { topic, partition, message } = args[0] @@ -146,13 +131,29 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf const consumer = createConsumer.apply(this, arguments) - consumer.on(consumer.events.COMMIT_OFFSETS, commitsFromEvent) + consumer.on(consumer.events.COMMIT_OFFSETS, (event) => { + const { payload: { groupId: commitGroupId, topics } } = event + const commitList = [] + for (const { topic, partitions } of topics) { + for (const { partition, offset } of partitions) { + commitList.push({ + groupId: commitGroupId, + partition, + offset, + topic, + clusterId: resolvedClusterId, + }) + } + } + consumerCommitCh.publish(commitList) + }) const run = consumer.run const groupId = arguments[0].groupId consumer.run = function ({ eachMessage, eachBatch, ...runArgs }) { const wrapConsume = (clusterId) => { + resolvedClusterId = clusterId return run({ eachMessage: wrappedCallback( eachMessage, diff --git a/packages/datadog-plugin-kafkajs/src/consumer.js b/packages/datadog-plugin-kafkajs/src/consumer.js index 4638ab0ca8f..d5813ef953a 100644 --- a/packages/datadog-plugin-kafkajs/src/consumer.js +++ b/packages/datadog-plugin-kafkajs/src/consumer.js @@ -40,14 +40,18 @@ class KafkajsConsumerPlugin extends ConsumerPlugin { * @returns {ConsumerBacklog} */ transformCommit (commit) { - const { groupId, partition, offset, topic } = commit - return { + const { groupId, partition, offset, topic, clusterId } = commit + const backlog = { partition, topic, type: 'kafka_commit', offset: Number(offset), consumer_group: groupId, } + if (clusterId) { + backlog.kafka_cluster_id = clusterId + } + return backlog } commit (commitList) { diff --git a/packages/datadog-plugin-kafkajs/src/producer.js b/packages/datadog-plugin-kafkajs/src/producer.js index 70d0a9bfdd0..17fef7bd3d8 100644 --- a/packages/datadog-plugin-kafkajs/src/producer.js +++ b/packages/datadog-plugin-kafkajs/src/producer.js @@ -37,16 +37,20 @@ class KafkajsProducerPlugin extends ProducerPlugin { * @param {ProducerResponseItem} response * @returns {ProducerBacklog} */ - transformProduceResponse (response) { + transformProduceResponse (response, clusterId) { // In produce protocol >=v3, the offset key changes from `offset` to `baseOffset` const { topicName: topic, partition, offset, baseOffset } = response const offsetAsLong = offset || baseOffset - return { + const backlog = { type: 'kafka_produce', partition, offset: offsetAsLong ? Number(offsetAsLong) : undefined, topic, } + if (clusterId) { + backlog.kafka_cluster_id = clusterId + } + return backlog } /** @@ -56,6 +60,7 @@ class KafkajsProducerPlugin extends ProducerPlugin { */ commit (ctx) { const commitList = ctx.result + const clusterId = ctx.clusterId if (!this.config.dsmEnabled) return if (!commitList || !Array.isArray(commitList)) return @@ -65,7 +70,7 @@ class KafkajsProducerPlugin extends ProducerPlugin { 'offset', 'topic', ] - for (const commit of commitList.map(this.transformProduceResponse)) { + for (const commit of commitList.map(r => this.transformProduceResponse(r, clusterId))) { if (keys.some(key => !commit.hasOwnProperty(key))) continue this.tracer.setOffset(commit) } diff --git a/packages/datadog-plugin-kafkajs/test/dsm.spec.js b/packages/datadog-plugin-kafkajs/test/dsm.spec.js index 685fb2f8498..6c527f2b791 100644 --- a/packages/datadog-plugin-kafkajs/test/dsm.spec.js +++ b/packages/datadog-plugin-kafkajs/test/dsm.spec.js @@ -219,18 +219,25 @@ describe('Plugin', () => { assert.strictEqual(runArg?.offset, commitMeta.offset) assert.strictEqual(runArg?.partition, commitMeta.partition) assert.strictEqual(runArg?.topic, commitMeta.topic) - assertObjectContains(runArg, { + const expectedBacklog = { type: 'kafka_commit', consumer_group: 'test-group', - }) + } + if (clusterIdAvailable) { + expectedBacklog.kafka_cluster_id = testKafkaClusterId + } + assertObjectContains(runArg, expectedBacklog) }) } it('Should add backlog on producer response', async () => { await sendMessages(kafka, testTopic, messages) sinon.assert.calledOnce(setOffsetSpy) - const { topic } = setOffsetSpy.lastCall.args[0] - assert.strictEqual(topic, testTopic) + const runArg = setOffsetSpy.lastCall.args[0] + assert.strictEqual(runArg.topic, testTopic) + if (clusterIdAvailable) { + assert.strictEqual(runArg.kafka_cluster_id, testKafkaClusterId) + } }) }) }) From 871e89d41d8f55a373ee50d464e8a92cdad2b55a Mon Sep 17 00:00:00 2001 From: Rob Carlan Date: Thu, 19 Feb 2026 14:52:08 -0500 Subject: [PATCH 2/4] test(kafkajs): add comments explaining clusterIdAvailable version guard Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/datadog-plugin-kafkajs/test/dsm.spec.js | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/datadog-plugin-kafkajs/test/dsm.spec.js b/packages/datadog-plugin-kafkajs/test/dsm.spec.js index 6c527f2b791..68b3977c0ea 100644 --- a/packages/datadog-plugin-kafkajs/test/dsm.spec.js +++ b/packages/datadog-plugin-kafkajs/test/dsm.spec.js @@ -24,6 +24,7 @@ const getDsmPathwayHash = (testTopic, clusterIdAvailable, isProducer, parentHash edgeTags = ['direction:in', 'group:test-group', 'topic:' + testTopic, 'type:kafka'] } + // kafka_cluster_id is only available in kafkajs >=1.13 if (clusterIdAvailable) { edgeTags.push(`kafka_cluster_id:${testKafkaClusterId}`) } @@ -223,6 +224,7 @@ describe('Plugin', () => { type: 'kafka_commit', consumer_group: 'test-group', } + // kafka_cluster_id is only available in kafkajs >=1.13 if (clusterIdAvailable) { expectedBacklog.kafka_cluster_id = testKafkaClusterId } @@ -235,6 +237,7 @@ describe('Plugin', () => { sinon.assert.calledOnce(setOffsetSpy) const runArg = setOffsetSpy.lastCall.args[0] assert.strictEqual(runArg.topic, testTopic) + // kafka_cluster_id is only available in kafkajs >=1.13 if (clusterIdAvailable) { assert.strictEqual(runArg.kafka_cluster_id, testKafkaClusterId) } From 15f2c0ac3c2ce83454239677ec8115347d3de973 Mon Sep 17 00:00:00 2001 From: Rob Carlan Date: Thu, 19 Feb 2026 14:53:08 -0500 Subject: [PATCH 3/4] test(kafkajs): remove irrelevant comment from getDsmPathwayHash Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/datadog-plugin-kafkajs/test/dsm.spec.js | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/datadog-plugin-kafkajs/test/dsm.spec.js b/packages/datadog-plugin-kafkajs/test/dsm.spec.js index 68b3977c0ea..62e8134ee0d 100644 --- a/packages/datadog-plugin-kafkajs/test/dsm.spec.js +++ b/packages/datadog-plugin-kafkajs/test/dsm.spec.js @@ -24,7 +24,6 @@ const getDsmPathwayHash = (testTopic, clusterIdAvailable, isProducer, parentHash edgeTags = ['direction:in', 'group:test-group', 'topic:' + testTopic, 'type:kafka'] } - // kafka_cluster_id is only available in kafkajs >=1.13 if (clusterIdAvailable) { edgeTags.push(`kafka_cluster_id:${testKafkaClusterId}`) } From b3d6e3c95f7933809a98ace87c06d6823a56dcf3 Mon Sep 17 00:00:00 2001 From: Rob Carlan Date: Mon, 23 Feb 2026 10:35:51 -0500 Subject: [PATCH 4/4] fix(kafkajs): add comment explaining resolvedClusterId safety Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/datadog-instrumentations/src/kafkajs.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/datadog-instrumentations/src/kafkajs.js b/packages/datadog-instrumentations/src/kafkajs.js index e32fcb07dc3..70ca2686eaf 100644 --- a/packages/datadog-instrumentations/src/kafkajs.js +++ b/packages/datadog-instrumentations/src/kafkajs.js @@ -153,6 +153,8 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf consumer.run = function ({ eachMessage, eachBatch, ...runArgs }) { const wrapConsume = (clusterId) => { + // In kafkajs COMMIT_OFFSETS always happens in the context of one synchronous run + // So this will always reference a correct cluster id resolvedClusterId = clusterId return run({ eachMessage: wrappedCallback(