From 2ac6efe2aff27f2283c02c08c0f88fac0ceb2968 Mon Sep 17 00:00:00 2001 From: Rob Carlan Date: Wed, 18 Feb 2026 15:01:54 -0500 Subject: [PATCH 1/2] fix(kafkajs): include kafka_cluster_id in DSM backlog offset tracking DSM checkpoints correctly included kafka_cluster_id in edge tags, but the backlog/offset tracking (which feeds lag metrics like data_streams.kafka.lag_messages and data_streams.kafka.lag_seconds) did not. This caused cross-cluster offset mixing when the same topic exists on multiple Kafka clusters, producing wildly incorrect lag values. Thread clusterId through to setOffset calls for both producer and consumer commit paths so that backlog entries are scoped per cluster. DSMON-1226 Co-Authored-By: Claude Opus 4.6 --- .../datadog-instrumentations/src/kafkajs.js | 35 ++++++++++--------- .../datadog-plugin-kafkajs/src/consumer.js | 8 +++-- .../datadog-plugin-kafkajs/src/producer.js | 11 ++++-- .../datadog-plugin-kafkajs/test/dsm.spec.js | 15 +++++--- 4 files changed, 43 insertions(+), 26 deletions(-) diff --git a/packages/datadog-instrumentations/src/kafkajs.js b/packages/datadog-instrumentations/src/kafkajs.js index 57740c8a03c..e32fcb07dc3 100644 --- a/packages/datadog-instrumentations/src/kafkajs.js +++ b/packages/datadog-instrumentations/src/kafkajs.js @@ -24,22 +24,6 @@ const batchConsumerErrorCh = channel('apm:kafkajs:consume-batch:error') const disabledHeaderWeakSet = new WeakSet() -function commitsFromEvent (event) { - const { payload: { groupId, topics } } = event - const commitList = [] - for (const { topic, partitions } of topics) { - for (const { partition, offset } of partitions) { - commitList.push({ - groupId, - partition, - offset, - topic, - }) - } - } - consumerCommitCh.publish(commitList) -} - addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKafka) => { class Kafka extends BaseKafka { constructor (options) { @@ -132,6 +116,7 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf } const kafkaClusterIdPromise = getKafkaClusterId(this) + let resolvedClusterId = null const eachMessageExtractor = (args, clusterId) => { const { topic, partition, message } = args[0] @@ -146,13 +131,29 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf const consumer = createConsumer.apply(this, arguments) - consumer.on(consumer.events.COMMIT_OFFSETS, commitsFromEvent) + consumer.on(consumer.events.COMMIT_OFFSETS, (event) => { + const { payload: { groupId: commitGroupId, topics } } = event + const commitList = [] + for (const { topic, partitions } of topics) { + for (const { partition, offset } of partitions) { + commitList.push({ + groupId: commitGroupId, + partition, + offset, + topic, + clusterId: resolvedClusterId, + }) + } + } + consumerCommitCh.publish(commitList) + }) const run = consumer.run const groupId = arguments[0].groupId consumer.run = function ({ eachMessage, eachBatch, ...runArgs }) { const wrapConsume = (clusterId) => { + resolvedClusterId = clusterId return run({ eachMessage: wrappedCallback( eachMessage, diff --git a/packages/datadog-plugin-kafkajs/src/consumer.js b/packages/datadog-plugin-kafkajs/src/consumer.js index 4638ab0ca8f..d5813ef953a 100644 --- a/packages/datadog-plugin-kafkajs/src/consumer.js +++ b/packages/datadog-plugin-kafkajs/src/consumer.js @@ -40,14 +40,18 @@ class KafkajsConsumerPlugin extends ConsumerPlugin { * @returns {ConsumerBacklog} */ transformCommit (commit) { - const { groupId, partition, offset, topic } = commit - return { + const { groupId, partition, offset, topic, clusterId } = commit + const backlog = { partition, topic, type: 'kafka_commit', offset: Number(offset), consumer_group: groupId, } + if (clusterId) { + backlog.kafka_cluster_id = clusterId + } + return backlog } commit (commitList) { diff --git a/packages/datadog-plugin-kafkajs/src/producer.js b/packages/datadog-plugin-kafkajs/src/producer.js index 70d0a9bfdd0..17fef7bd3d8 100644 --- a/packages/datadog-plugin-kafkajs/src/producer.js +++ b/packages/datadog-plugin-kafkajs/src/producer.js @@ -37,16 +37,20 @@ class KafkajsProducerPlugin extends ProducerPlugin { * @param {ProducerResponseItem} response * @returns {ProducerBacklog} */ - transformProduceResponse (response) { + transformProduceResponse (response, clusterId) { // In produce protocol >=v3, the offset key changes from `offset` to `baseOffset` const { topicName: topic, partition, offset, baseOffset } = response const offsetAsLong = offset || baseOffset - return { + const backlog = { type: 'kafka_produce', partition, offset: offsetAsLong ? Number(offsetAsLong) : undefined, topic, } + if (clusterId) { + backlog.kafka_cluster_id = clusterId + } + return backlog } /** @@ -56,6 +60,7 @@ class KafkajsProducerPlugin extends ProducerPlugin { */ commit (ctx) { const commitList = ctx.result + const clusterId = ctx.clusterId if (!this.config.dsmEnabled) return if (!commitList || !Array.isArray(commitList)) return @@ -65,7 +70,7 @@ class KafkajsProducerPlugin extends ProducerPlugin { 'offset', 'topic', ] - for (const commit of commitList.map(this.transformProduceResponse)) { + for (const commit of commitList.map(r => this.transformProduceResponse(r, clusterId))) { if (keys.some(key => !commit.hasOwnProperty(key))) continue this.tracer.setOffset(commit) } diff --git a/packages/datadog-plugin-kafkajs/test/dsm.spec.js b/packages/datadog-plugin-kafkajs/test/dsm.spec.js index 685fb2f8498..6c527f2b791 100644 --- a/packages/datadog-plugin-kafkajs/test/dsm.spec.js +++ b/packages/datadog-plugin-kafkajs/test/dsm.spec.js @@ -219,18 +219,25 @@ describe('Plugin', () => { assert.strictEqual(runArg?.offset, commitMeta.offset) assert.strictEqual(runArg?.partition, commitMeta.partition) assert.strictEqual(runArg?.topic, commitMeta.topic) - assertObjectContains(runArg, { + const expectedBacklog = { type: 'kafka_commit', consumer_group: 'test-group', - }) + } + if (clusterIdAvailable) { + expectedBacklog.kafka_cluster_id = testKafkaClusterId + } + assertObjectContains(runArg, expectedBacklog) }) } it('Should add backlog on producer response', async () => { await sendMessages(kafka, testTopic, messages) sinon.assert.calledOnce(setOffsetSpy) - const { topic } = setOffsetSpy.lastCall.args[0] - assert.strictEqual(topic, testTopic) + const runArg = setOffsetSpy.lastCall.args[0] + assert.strictEqual(runArg.topic, testTopic) + if (clusterIdAvailable) { + assert.strictEqual(runArg.kafka_cluster_id, testKafkaClusterId) + } }) }) }) From 4919b10bff2f37a075ac8507f8ab2863a4118d85 Mon Sep 17 00:00:00 2001 From: Rob Carlan Date: Wed, 18 Feb 2026 15:22:39 -0500 Subject: [PATCH 2/2] fix(confluent): add kafka_cluster_id to DSM backlog and checkpoints The confluent-kafka-javascript instrumentation never called getKafkaClusterId, so cluster_id was missing from both DSM checkpoints (edge tags) and backlog offset tracking. This causes incorrect pathway hashes and cross-cluster offset mixing for lag metrics. Changes: - Extract getKafkaClusterId/isPromise to shared helpers/kafka.js - Update kafkajs instrumentation to use shared helper - Add cluster ID retrieval to confluent KafkaJS producer and consumer - Thread clusterId through producer ctx, consumer extractedArgs, and offset tracking (updateLatestOffset key includes clusterId) Note: The native module path (Producer/Consumer classes) does not yet support cluster ID retrieval as it lacks an admin API. DSMON-1226 Co-Authored-By: Claude Opus 4.6 --- .../src/confluentinc-kafka-javascript.js | 197 ++++++++++-------- .../src/helpers/kafka.js | 47 +++++ .../datadog-instrumentations/src/kafkajs.js | 34 +-- 3 files changed, 163 insertions(+), 115 deletions(-) create mode 100644 packages/datadog-instrumentations/src/helpers/kafka.js diff --git a/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js b/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js index 0583bc212e5..bcbff32333f 100644 --- a/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js +++ b/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js @@ -7,6 +7,7 @@ const { addHook, channel, } = require('./helpers/instrument') +const { getKafkaClusterId, isPromise } = require('./helpers/kafka') // Create channels for Confluent Kafka JavaScript const channels = { @@ -195,6 +196,8 @@ function instrumentKafkaJS (kafkaJS) { // Wrap the producer method if it exists if (typeof kafka?.producer === 'function') { + const kafkaProducerClusterIdPromise = getKafkaClusterId(kafka) + shimmer.wrap(kafka, 'producer', function wrapProducerMethod (producerMethod) { return function wrappedProducerMethod () { const producer = producerMethod.apply(this, arguments) @@ -211,48 +214,58 @@ function instrumentKafkaJS (kafkaJS) { return send.apply(this, arguments) } - const ctx = { - topic: payload?.topic, - messages: payload?.messages || [], - bootstrapServers: kafka._ddBrokers, - disableHeaderInjection: disabledHeaderWeakSet.has(producer), - } - - return channels.producerStart.runStores(ctx, () => { - try { - const result = send.apply(this, arguments) + const wrappedSendWithClusterId = (clusterId) => { + const ctx = { + topic: payload?.topic, + messages: payload?.messages || [], + bootstrapServers: kafka._ddBrokers, + disableHeaderInjection: disabledHeaderWeakSet.has(producer), + clusterId, + } - result.then((res) => { - ctx.result = res - channels.producerCommit.publish(ctx) - channels.producerFinish.publish(ctx) - }, (err) => { - if (err) { - // Fixes bug where we would inject message headers for kafka brokers - // that don't support headers (version <0.11). On the error, we disable - // header injection. Tnfortunately the error name / type is not more specific. - // This approach is implemented by other tracers as well. - if (err.name === 'KafkaJSError' && err.type === 'ERR_UNKNOWN') { - disabledHeaderWeakSet.add(producer) - log.error( - // eslint-disable-next-line @stylistic/max-len - 'Kafka Broker responded with UNKNOWN_SERVER_ERROR (-1). Please look at broker logs for more information. Tracer message header injection for Kafka is disabled.' - ) + return channels.producerStart.runStores(ctx, () => { + try { + const result = send.apply(this, arguments) + + result.then((res) => { + ctx.result = res + channels.producerCommit.publish(ctx) + channels.producerFinish.publish(ctx) + }, (err) => { + if (err) { + // Fixes bug where we would inject message headers for kafka brokers + // that don't support headers (version <0.11). On the error, we disable + // header injection. Tnfortunately the error name / type is not more specific. + // This approach is implemented by other tracers as well. + if (err.name === 'KafkaJSError' && err.type === 'ERR_UNKNOWN') { + disabledHeaderWeakSet.add(producer) + log.error( + // eslint-disable-next-line @stylistic/max-len + 'Kafka Broker responded with UNKNOWN_SERVER_ERROR (-1). Please look at broker logs for more information. Tracer message header injection for Kafka is disabled.' + ) + } + ctx.error = err + channels.producerError.publish(ctx) } - ctx.error = err - channels.producerError.publish(ctx) - } + channels.producerFinish.publish(ctx) + }) + + return result + } catch (e) { + ctx.error = e + channels.producerError.publish(ctx) channels.producerFinish.publish(ctx) - }) - - return result - } catch (e) { - ctx.error = e - channels.producerError.publish(ctx) - channels.producerFinish.publish(ctx) - throw e - } - }) + throw e + } + }) + } + + if (isPromise(kafkaProducerClusterIdPromise)) { + return kafkaProducerClusterIdPromise.then((clusterId) => { + return wrappedSendWithClusterId(clusterId) + }) + } + return wrappedSendWithClusterId(kafkaProducerClusterIdPromise) } }) } @@ -264,10 +277,13 @@ function instrumentKafkaJS (kafkaJS) { // Wrap the consumer method if it exists if (typeof kafka?.consumer === 'function') { + const kafkaConsumerClusterIdPromise = getKafkaClusterId(kafka) + shimmer.wrap(kafka, 'consumer', function wrapConsumerMethod (consumerMethod) { return function wrappedConsumerMethod (config) { const consumer = consumerMethod.apply(this, arguments) const groupId = getGroupId(config) + let resolvedClusterId = null // Wrap the run method for handling message consumption if (typeof consumer?.run === 'function') { @@ -277,49 +293,62 @@ function instrumentKafkaJS (kafkaJS) { return run.apply(this, arguments) } - const eachMessage = options?.eachMessage - const eachBatch = options?.eachBatch - if (eachMessage) { - options.eachMessage = wrapKafkaCallback( - eachMessage, - { - startCh: channels.consumerStart, - commitCh: channels.consumerCommit, - finishCh: channels.consumerFinish, - errorCh: channels.consumerError, - }, - (payload) => { - return { - topic: payload?.topic, - partition: payload?.partition, - offset: payload?.message?.offset, - message: payload?.message, - groupId, - } - }) - } else if (eachBatch) { - options.eachBatch = wrapKafkaCallback( - eachBatch, - { - startCh: channels.batchConsumerStart, - commitCh: channels.batchConsumerCommit, - finishCh: channels.batchConsumerFinish, - errorCh: channels.batchConsumerError, - }, - (payload) => { - const { batch } = payload - return { - topic: batch?.topic, - partition: batch?.partition, - offset: batch?.messages[batch?.messages?.length - 1]?.offset, - messages: batch?.messages, - groupId, + const wrapConsume = (clusterId) => { + resolvedClusterId = clusterId + + const eachMessage = options?.eachMessage + const eachBatch = options?.eachBatch + if (eachMessage) { + options.eachMessage = wrapKafkaCallback( + eachMessage, + { + startCh: channels.consumerStart, + commitCh: channels.consumerCommit, + finishCh: channels.consumerFinish, + errorCh: channels.consumerError, + }, + (payload) => { + return { + topic: payload?.topic, + partition: payload?.partition, + offset: payload?.message?.offset, + message: payload?.message, + groupId, + clusterId, + } + }) + } else if (eachBatch) { + options.eachBatch = wrapKafkaCallback( + eachBatch, + { + startCh: channels.batchConsumerStart, + commitCh: channels.batchConsumerCommit, + finishCh: channels.batchConsumerFinish, + errorCh: channels.batchConsumerError, + }, + (payload) => { + const { batch } = payload + return { + topic: batch?.topic, + partition: batch?.partition, + offset: batch?.messages[batch?.messages?.length - 1]?.offset, + messages: batch?.messages, + groupId, + clusterId, + } } - } - ) + ) + } + + return run.apply(this, arguments) } - return run.apply(this, arguments) + if (isPromise(kafkaConsumerClusterIdPromise)) { + return kafkaConsumerClusterIdPromise.then((clusterId) => { + return wrapConsume(clusterId) + }) + } + return wrapConsume(kafkaConsumerClusterIdPromise) } }) } @@ -362,7 +391,10 @@ function wrapKafkaCallback (callback, { startCh, commitCh, finishCh, errorCh }, } return startCh.runStores(ctx, () => { - updateLatestOffset(commitPayload?.topic, commitPayload?.partition, commitPayload?.offset, commitPayload?.groupId) + updateLatestOffset( + commitPayload?.topic, commitPayload?.partition, commitPayload?.offset, + commitPayload?.groupId, commitPayload?.clusterId + ) try { const result = callback.apply(this, arguments) @@ -401,13 +433,14 @@ function getGroupId (config) { return '' } -function updateLatestOffset (topic, partition, offset, groupId) { - const key = `${topic}:${partition}` +function updateLatestOffset (topic, partition, offset, groupId, clusterId) { + const key = `${topic}:${partition}:${clusterId || ''}` latestConsumerOffsets.set(key, { topic, partition, offset, groupId, + clusterId, }) } diff --git a/packages/datadog-instrumentations/src/helpers/kafka.js b/packages/datadog-instrumentations/src/helpers/kafka.js new file mode 100644 index 00000000000..858ac4f68d4 --- /dev/null +++ b/packages/datadog-instrumentations/src/helpers/kafka.js @@ -0,0 +1,47 @@ +'use strict' + +/** + * Retrieve the Kafka cluster ID from the admin API. + * Returns a cached value, a promise, or null if unavailable. + * + * @param {object} kafka - KafkaJS-compatible Kafka instance + * @returns {string|Promise|null} + */ +function getKafkaClusterId (kafka) { + if (kafka._ddKafkaClusterId) { + return kafka._ddKafkaClusterId + } + + if (!kafka.admin) { + return null + } + + const admin = kafka.admin() + + if (!admin.describeCluster) { + return null + } + + return admin.connect() + .then(() => { + return admin.describeCluster() + }) + .then((clusterInfo) => { + const clusterId = clusterInfo?.clusterId + kafka._ddKafkaClusterId = clusterId + admin.disconnect() + return clusterId + }) + .catch((error) => { + throw error + }) +} + +function isPromise (obj) { + return !!obj && (typeof obj === 'object' || typeof obj === 'function') && typeof obj.then === 'function' +} + +module.exports = { + getKafkaClusterId, + isPromise, +} diff --git a/packages/datadog-instrumentations/src/kafkajs.js b/packages/datadog-instrumentations/src/kafkajs.js index e32fcb07dc3..a29aa8df950 100644 --- a/packages/datadog-instrumentations/src/kafkajs.js +++ b/packages/datadog-instrumentations/src/kafkajs.js @@ -7,6 +7,7 @@ const { channel, addHook, } = require('./helpers/instrument') +const { getKafkaClusterId, isPromise } = require('./helpers/kafka') const producerStartCh = channel('apm:kafkajs:produce:start') const producerCommitCh = channel('apm:kafkajs:produce:commit') @@ -229,36 +230,3 @@ const wrappedCallback = (fn, startCh, finishCh, errorCh, extractArgs, clusterId) : fn } -const getKafkaClusterId = (kafka) => { - if (kafka._ddKafkaClusterId) { - return kafka._ddKafkaClusterId - } - - if (!kafka.admin) { - return null - } - - const admin = kafka.admin() - - if (!admin.describeCluster) { - return null - } - - return admin.connect() - .then(() => { - return admin.describeCluster() - }) - .then((clusterInfo) => { - const clusterId = clusterInfo?.clusterId - kafka._ddKafkaClusterId = clusterId - admin.disconnect() - return clusterId - }) - .catch((error) => { - throw error - }) -} - -function isPromise (obj) { - return !!obj && (typeof obj === 'object' || typeof obj === 'function') && typeof obj.then === 'function' -}