diff --git a/packages/datadog-instrumentations/src/kafkajs.js b/packages/datadog-instrumentations/src/kafkajs.js index 57740c8a03c..af49d067b5f 100644 --- a/packages/datadog-instrumentations/src/kafkajs.js +++ b/packages/datadog-instrumentations/src/kafkajs.js @@ -24,22 +24,6 @@ const batchConsumerErrorCh = channel('apm:kafkajs:consume-batch:error') const disabledHeaderWeakSet = new WeakSet() -function commitsFromEvent (event) { - const { payload: { groupId, topics } } = event - const commitList = [] - for (const { topic, partitions } of topics) { - for (const { partition, offset } of partitions) { - commitList.push({ - groupId, - partition, - offset, - topic, - }) - } - } - consumerCommitCh.publish(commitList) -} - addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKafka) => { class Kafka extends BaseKafka { constructor (options) { @@ -132,6 +116,7 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf } const kafkaClusterIdPromise = getKafkaClusterId(this) + let resolvedClusterId = null const eachMessageExtractor = (args, clusterId) => { const { topic, partition, message } = args[0] @@ -146,13 +131,31 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf const consumer = createConsumer.apply(this, arguments) - consumer.on(consumer.events.COMMIT_OFFSETS, commitsFromEvent) + consumer.on(consumer.events.COMMIT_OFFSETS, (event) => { + const { payload: { groupId: commitGroupId, topics } } = event + const commitList = [] + for (const { topic, partitions } of topics) { + for (const { partition, offset } of partitions) { + commitList.push({ + groupId: commitGroupId, + partition, + offset, + topic, + clusterId: resolvedClusterId, + }) + } + } + consumerCommitCh.publish(commitList) + }) const run = consumer.run const groupId = arguments[0].groupId consumer.run = function ({ eachMessage, eachBatch, ...runArgs }) { const wrapConsume = (clusterId) => { + // In kafkajs COMMIT_OFFSETS always happens in the context of one synchronous run + // So this will always reference a correct cluster id + resolvedClusterId = clusterId return run({ eachMessage: wrappedCallback( eachMessage, diff --git a/packages/datadog-plugin-kafkajs/src/consumer.js b/packages/datadog-plugin-kafkajs/src/consumer.js index 4638ab0ca8f..d5813ef953a 100644 --- a/packages/datadog-plugin-kafkajs/src/consumer.js +++ b/packages/datadog-plugin-kafkajs/src/consumer.js @@ -40,14 +40,18 @@ class KafkajsConsumerPlugin extends ConsumerPlugin { * @returns {ConsumerBacklog} */ transformCommit (commit) { - const { groupId, partition, offset, topic } = commit - return { + const { groupId, partition, offset, topic, clusterId } = commit + const backlog = { partition, topic, type: 'kafka_commit', offset: Number(offset), consumer_group: groupId, } + if (clusterId) { + backlog.kafka_cluster_id = clusterId + } + return backlog } commit (commitList) { diff --git a/packages/datadog-plugin-kafkajs/src/producer.js b/packages/datadog-plugin-kafkajs/src/producer.js index 70d0a9bfdd0..17fef7bd3d8 100644 --- a/packages/datadog-plugin-kafkajs/src/producer.js +++ b/packages/datadog-plugin-kafkajs/src/producer.js @@ -37,16 +37,20 @@ class KafkajsProducerPlugin extends ProducerPlugin { * @param {ProducerResponseItem} response * @returns {ProducerBacklog} */ - transformProduceResponse (response) { + transformProduceResponse (response, clusterId) { // In produce protocol >=v3, the offset key changes from `offset` to `baseOffset` const { topicName: topic, partition, offset, baseOffset } = response const offsetAsLong = offset || baseOffset - return { + const backlog = { type: 'kafka_produce', partition, offset: offsetAsLong ? Number(offsetAsLong) : undefined, topic, } + if (clusterId) { + backlog.kafka_cluster_id = clusterId + } + return backlog } /** @@ -56,6 +60,7 @@ class KafkajsProducerPlugin extends ProducerPlugin { */ commit (ctx) { const commitList = ctx.result + const clusterId = ctx.clusterId if (!this.config.dsmEnabled) return if (!commitList || !Array.isArray(commitList)) return @@ -65,7 +70,7 @@ class KafkajsProducerPlugin extends ProducerPlugin { 'offset', 'topic', ] - for (const commit of commitList.map(this.transformProduceResponse)) { + for (const commit of commitList.map(r => this.transformProduceResponse(r, clusterId))) { if (keys.some(key => !commit.hasOwnProperty(key))) continue this.tracer.setOffset(commit) } diff --git a/packages/datadog-plugin-kafkajs/test/dsm.spec.js b/packages/datadog-plugin-kafkajs/test/dsm.spec.js index 685fb2f8498..62e8134ee0d 100644 --- a/packages/datadog-plugin-kafkajs/test/dsm.spec.js +++ b/packages/datadog-plugin-kafkajs/test/dsm.spec.js @@ -219,18 +219,27 @@ describe('Plugin', () => { assert.strictEqual(runArg?.offset, commitMeta.offset) assert.strictEqual(runArg?.partition, commitMeta.partition) assert.strictEqual(runArg?.topic, commitMeta.topic) - assertObjectContains(runArg, { + const expectedBacklog = { type: 'kafka_commit', consumer_group: 'test-group', - }) + } + // kafka_cluster_id is only available in kafkajs >=1.13 + if (clusterIdAvailable) { + expectedBacklog.kafka_cluster_id = testKafkaClusterId + } + assertObjectContains(runArg, expectedBacklog) }) } it('Should add backlog on producer response', async () => { await sendMessages(kafka, testTopic, messages) sinon.assert.calledOnce(setOffsetSpy) - const { topic } = setOffsetSpy.lastCall.args[0] - assert.strictEqual(topic, testTopic) + const runArg = setOffsetSpy.lastCall.args[0] + assert.strictEqual(runArg.topic, testTopic) + // kafka_cluster_id is only available in kafkajs >=1.13 + if (clusterIdAvailable) { + assert.strictEqual(runArg.kafka_cluster_id, testKafkaClusterId) + } }) }) })