diff --git a/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js b/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js index 0583bc212e5..bcbff32333f 100644 --- a/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js +++ b/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js @@ -7,6 +7,7 @@ const { addHook, channel, } = require('./helpers/instrument') +const { getKafkaClusterId, isPromise } = require('./helpers/kafka') // Create channels for Confluent Kafka JavaScript const channels = { @@ -195,6 +196,8 @@ function instrumentKafkaJS (kafkaJS) { // Wrap the producer method if it exists if (typeof kafka?.producer === 'function') { + const kafkaProducerClusterIdPromise = getKafkaClusterId(kafka) + shimmer.wrap(kafka, 'producer', function wrapProducerMethod (producerMethod) { return function wrappedProducerMethod () { const producer = producerMethod.apply(this, arguments) @@ -211,48 +214,58 @@ function instrumentKafkaJS (kafkaJS) { return send.apply(this, arguments) } - const ctx = { - topic: payload?.topic, - messages: payload?.messages || [], - bootstrapServers: kafka._ddBrokers, - disableHeaderInjection: disabledHeaderWeakSet.has(producer), - } - - return channels.producerStart.runStores(ctx, () => { - try { - const result = send.apply(this, arguments) + const wrappedSendWithClusterId = (clusterId) => { + const ctx = { + topic: payload?.topic, + messages: payload?.messages || [], + bootstrapServers: kafka._ddBrokers, + disableHeaderInjection: disabledHeaderWeakSet.has(producer), + clusterId, + } - result.then((res) => { - ctx.result = res - channels.producerCommit.publish(ctx) - channels.producerFinish.publish(ctx) - }, (err) => { - if (err) { - // Fixes bug where we would inject message headers for kafka brokers - // that don't support headers (version <0.11). On the error, we disable - // header injection. Tnfortunately the error name / type is not more specific. - // This approach is implemented by other tracers as well. - if (err.name === 'KafkaJSError' && err.type === 'ERR_UNKNOWN') { - disabledHeaderWeakSet.add(producer) - log.error( - // eslint-disable-next-line @stylistic/max-len - 'Kafka Broker responded with UNKNOWN_SERVER_ERROR (-1). Please look at broker logs for more information. Tracer message header injection for Kafka is disabled.' - ) + return channels.producerStart.runStores(ctx, () => { + try { + const result = send.apply(this, arguments) + + result.then((res) => { + ctx.result = res + channels.producerCommit.publish(ctx) + channels.producerFinish.publish(ctx) + }, (err) => { + if (err) { + // Fixes bug where we would inject message headers for kafka brokers + // that don't support headers (version <0.11). On the error, we disable + // header injection. Tnfortunately the error name / type is not more specific. + // This approach is implemented by other tracers as well. + if (err.name === 'KafkaJSError' && err.type === 'ERR_UNKNOWN') { + disabledHeaderWeakSet.add(producer) + log.error( + // eslint-disable-next-line @stylistic/max-len + 'Kafka Broker responded with UNKNOWN_SERVER_ERROR (-1). Please look at broker logs for more information. Tracer message header injection for Kafka is disabled.' + ) + } + ctx.error = err + channels.producerError.publish(ctx) } - ctx.error = err - channels.producerError.publish(ctx) - } + channels.producerFinish.publish(ctx) + }) + + return result + } catch (e) { + ctx.error = e + channels.producerError.publish(ctx) channels.producerFinish.publish(ctx) - }) - - return result - } catch (e) { - ctx.error = e - channels.producerError.publish(ctx) - channels.producerFinish.publish(ctx) - throw e - } - }) + throw e + } + }) + } + + if (isPromise(kafkaProducerClusterIdPromise)) { + return kafkaProducerClusterIdPromise.then((clusterId) => { + return wrappedSendWithClusterId(clusterId) + }) + } + return wrappedSendWithClusterId(kafkaProducerClusterIdPromise) } }) } @@ -264,10 +277,13 @@ function instrumentKafkaJS (kafkaJS) { // Wrap the consumer method if it exists if (typeof kafka?.consumer === 'function') { + const kafkaConsumerClusterIdPromise = getKafkaClusterId(kafka) + shimmer.wrap(kafka, 'consumer', function wrapConsumerMethod (consumerMethod) { return function wrappedConsumerMethod (config) { const consumer = consumerMethod.apply(this, arguments) const groupId = getGroupId(config) + let resolvedClusterId = null // Wrap the run method for handling message consumption if (typeof consumer?.run === 'function') { @@ -277,49 +293,62 @@ function instrumentKafkaJS (kafkaJS) { return run.apply(this, arguments) } - const eachMessage = options?.eachMessage - const eachBatch = options?.eachBatch - if (eachMessage) { - options.eachMessage = wrapKafkaCallback( - eachMessage, - { - startCh: channels.consumerStart, - commitCh: channels.consumerCommit, - finishCh: channels.consumerFinish, - errorCh: channels.consumerError, - }, - (payload) => { - return { - topic: payload?.topic, - partition: payload?.partition, - offset: payload?.message?.offset, - message: payload?.message, - groupId, - } - }) - } else if (eachBatch) { - options.eachBatch = wrapKafkaCallback( - eachBatch, - { - startCh: channels.batchConsumerStart, - commitCh: channels.batchConsumerCommit, - finishCh: channels.batchConsumerFinish, - errorCh: channels.batchConsumerError, - }, - (payload) => { - const { batch } = payload - return { - topic: batch?.topic, - partition: batch?.partition, - offset: batch?.messages[batch?.messages?.length - 1]?.offset, - messages: batch?.messages, - groupId, + const wrapConsume = (clusterId) => { + resolvedClusterId = clusterId + + const eachMessage = options?.eachMessage + const eachBatch = options?.eachBatch + if (eachMessage) { + options.eachMessage = wrapKafkaCallback( + eachMessage, + { + startCh: channels.consumerStart, + commitCh: channels.consumerCommit, + finishCh: channels.consumerFinish, + errorCh: channels.consumerError, + }, + (payload) => { + return { + topic: payload?.topic, + partition: payload?.partition, + offset: payload?.message?.offset, + message: payload?.message, + groupId, + clusterId, + } + }) + } else if (eachBatch) { + options.eachBatch = wrapKafkaCallback( + eachBatch, + { + startCh: channels.batchConsumerStart, + commitCh: channels.batchConsumerCommit, + finishCh: channels.batchConsumerFinish, + errorCh: channels.batchConsumerError, + }, + (payload) => { + const { batch } = payload + return { + topic: batch?.topic, + partition: batch?.partition, + offset: batch?.messages[batch?.messages?.length - 1]?.offset, + messages: batch?.messages, + groupId, + clusterId, + } } - } - ) + ) + } + + return run.apply(this, arguments) } - return run.apply(this, arguments) + if (isPromise(kafkaConsumerClusterIdPromise)) { + return kafkaConsumerClusterIdPromise.then((clusterId) => { + return wrapConsume(clusterId) + }) + } + return wrapConsume(kafkaConsumerClusterIdPromise) } }) } @@ -362,7 +391,10 @@ function wrapKafkaCallback (callback, { startCh, commitCh, finishCh, errorCh }, } return startCh.runStores(ctx, () => { - updateLatestOffset(commitPayload?.topic, commitPayload?.partition, commitPayload?.offset, commitPayload?.groupId) + updateLatestOffset( + commitPayload?.topic, commitPayload?.partition, commitPayload?.offset, + commitPayload?.groupId, commitPayload?.clusterId + ) try { const result = callback.apply(this, arguments) @@ -401,13 +433,14 @@ function getGroupId (config) { return '' } -function updateLatestOffset (topic, partition, offset, groupId) { - const key = `${topic}:${partition}` +function updateLatestOffset (topic, partition, offset, groupId, clusterId) { + const key = `${topic}:${partition}:${clusterId || ''}` latestConsumerOffsets.set(key, { topic, partition, offset, groupId, + clusterId, }) } diff --git a/packages/datadog-instrumentations/src/helpers/kafka.js b/packages/datadog-instrumentations/src/helpers/kafka.js new file mode 100644 index 00000000000..858ac4f68d4 --- /dev/null +++ b/packages/datadog-instrumentations/src/helpers/kafka.js @@ -0,0 +1,47 @@ +'use strict' + +/** + * Retrieve the Kafka cluster ID from the admin API. + * Returns a cached value, a promise, or null if unavailable. + * + * @param {object} kafka - KafkaJS-compatible Kafka instance + * @returns {string|Promise|null} + */ +function getKafkaClusterId (kafka) { + if (kafka._ddKafkaClusterId) { + return kafka._ddKafkaClusterId + } + + if (!kafka.admin) { + return null + } + + const admin = kafka.admin() + + if (!admin.describeCluster) { + return null + } + + return admin.connect() + .then(() => { + return admin.describeCluster() + }) + .then((clusterInfo) => { + const clusterId = clusterInfo?.clusterId + kafka._ddKafkaClusterId = clusterId + admin.disconnect() + return clusterId + }) + .catch((error) => { + throw error + }) +} + +function isPromise (obj) { + return !!obj && (typeof obj === 'object' || typeof obj === 'function') && typeof obj.then === 'function' +} + +module.exports = { + getKafkaClusterId, + isPromise, +} diff --git a/packages/datadog-instrumentations/src/kafkajs.js b/packages/datadog-instrumentations/src/kafkajs.js index 57740c8a03c..a29aa8df950 100644 --- a/packages/datadog-instrumentations/src/kafkajs.js +++ b/packages/datadog-instrumentations/src/kafkajs.js @@ -7,6 +7,7 @@ const { channel, addHook, } = require('./helpers/instrument') +const { getKafkaClusterId, isPromise } = require('./helpers/kafka') const producerStartCh = channel('apm:kafkajs:produce:start') const producerCommitCh = channel('apm:kafkajs:produce:commit') @@ -24,22 +25,6 @@ const batchConsumerErrorCh = channel('apm:kafkajs:consume-batch:error') const disabledHeaderWeakSet = new WeakSet() -function commitsFromEvent (event) { - const { payload: { groupId, topics } } = event - const commitList = [] - for (const { topic, partitions } of topics) { - for (const { partition, offset } of partitions) { - commitList.push({ - groupId, - partition, - offset, - topic, - }) - } - } - consumerCommitCh.publish(commitList) -} - addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKafka) => { class Kafka extends BaseKafka { constructor (options) { @@ -132,6 +117,7 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf } const kafkaClusterIdPromise = getKafkaClusterId(this) + let resolvedClusterId = null const eachMessageExtractor = (args, clusterId) => { const { topic, partition, message } = args[0] @@ -146,13 +132,29 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf const consumer = createConsumer.apply(this, arguments) - consumer.on(consumer.events.COMMIT_OFFSETS, commitsFromEvent) + consumer.on(consumer.events.COMMIT_OFFSETS, (event) => { + const { payload: { groupId: commitGroupId, topics } } = event + const commitList = [] + for (const { topic, partitions } of topics) { + for (const { partition, offset } of partitions) { + commitList.push({ + groupId: commitGroupId, + partition, + offset, + topic, + clusterId: resolvedClusterId, + }) + } + } + consumerCommitCh.publish(commitList) + }) const run = consumer.run const groupId = arguments[0].groupId consumer.run = function ({ eachMessage, eachBatch, ...runArgs }) { const wrapConsume = (clusterId) => { + resolvedClusterId = clusterId return run({ eachMessage: wrappedCallback( eachMessage, @@ -228,36 +230,3 @@ const wrappedCallback = (fn, startCh, finishCh, errorCh, extractArgs, clusterId) : fn } -const getKafkaClusterId = (kafka) => { - if (kafka._ddKafkaClusterId) { - return kafka._ddKafkaClusterId - } - - if (!kafka.admin) { - return null - } - - const admin = kafka.admin() - - if (!admin.describeCluster) { - return null - } - - return admin.connect() - .then(() => { - return admin.describeCluster() - }) - .then((clusterInfo) => { - const clusterId = clusterInfo?.clusterId - kafka._ddKafkaClusterId = clusterId - admin.disconnect() - return clusterId - }) - .catch((error) => { - throw error - }) -} - -function isPromise (obj) { - return !!obj && (typeof obj === 'object' || typeof obj === 'function') && typeof obj.then === 'function' -} diff --git a/packages/datadog-plugin-kafkajs/src/consumer.js b/packages/datadog-plugin-kafkajs/src/consumer.js index 4638ab0ca8f..d5813ef953a 100644 --- a/packages/datadog-plugin-kafkajs/src/consumer.js +++ b/packages/datadog-plugin-kafkajs/src/consumer.js @@ -40,14 +40,18 @@ class KafkajsConsumerPlugin extends ConsumerPlugin { * @returns {ConsumerBacklog} */ transformCommit (commit) { - const { groupId, partition, offset, topic } = commit - return { + const { groupId, partition, offset, topic, clusterId } = commit + const backlog = { partition, topic, type: 'kafka_commit', offset: Number(offset), consumer_group: groupId, } + if (clusterId) { + backlog.kafka_cluster_id = clusterId + } + return backlog } commit (commitList) { diff --git a/packages/datadog-plugin-kafkajs/src/producer.js b/packages/datadog-plugin-kafkajs/src/producer.js index 70d0a9bfdd0..17fef7bd3d8 100644 --- a/packages/datadog-plugin-kafkajs/src/producer.js +++ b/packages/datadog-plugin-kafkajs/src/producer.js @@ -37,16 +37,20 @@ class KafkajsProducerPlugin extends ProducerPlugin { * @param {ProducerResponseItem} response * @returns {ProducerBacklog} */ - transformProduceResponse (response) { + transformProduceResponse (response, clusterId) { // In produce protocol >=v3, the offset key changes from `offset` to `baseOffset` const { topicName: topic, partition, offset, baseOffset } = response const offsetAsLong = offset || baseOffset - return { + const backlog = { type: 'kafka_produce', partition, offset: offsetAsLong ? Number(offsetAsLong) : undefined, topic, } + if (clusterId) { + backlog.kafka_cluster_id = clusterId + } + return backlog } /** @@ -56,6 +60,7 @@ class KafkajsProducerPlugin extends ProducerPlugin { */ commit (ctx) { const commitList = ctx.result + const clusterId = ctx.clusterId if (!this.config.dsmEnabled) return if (!commitList || !Array.isArray(commitList)) return @@ -65,7 +70,7 @@ class KafkajsProducerPlugin extends ProducerPlugin { 'offset', 'topic', ] - for (const commit of commitList.map(this.transformProduceResponse)) { + for (const commit of commitList.map(r => this.transformProduceResponse(r, clusterId))) { if (keys.some(key => !commit.hasOwnProperty(key))) continue this.tracer.setOffset(commit) } diff --git a/packages/datadog-plugin-kafkajs/test/dsm.spec.js b/packages/datadog-plugin-kafkajs/test/dsm.spec.js index 685fb2f8498..6c527f2b791 100644 --- a/packages/datadog-plugin-kafkajs/test/dsm.spec.js +++ b/packages/datadog-plugin-kafkajs/test/dsm.spec.js @@ -219,18 +219,25 @@ describe('Plugin', () => { assert.strictEqual(runArg?.offset, commitMeta.offset) assert.strictEqual(runArg?.partition, commitMeta.partition) assert.strictEqual(runArg?.topic, commitMeta.topic) - assertObjectContains(runArg, { + const expectedBacklog = { type: 'kafka_commit', consumer_group: 'test-group', - }) + } + if (clusterIdAvailable) { + expectedBacklog.kafka_cluster_id = testKafkaClusterId + } + assertObjectContains(runArg, expectedBacklog) }) } it('Should add backlog on producer response', async () => { await sendMessages(kafka, testTopic, messages) sinon.assert.calledOnce(setOffsetSpy) - const { topic } = setOffsetSpy.lastCall.args[0] - assert.strictEqual(topic, testTopic) + const runArg = setOffsetSpy.lastCall.args[0] + assert.strictEqual(runArg.topic, testTopic) + if (clusterIdAvailable) { + assert.strictEqual(runArg.kafka_cluster_id, testKafkaClusterId) + } }) }) })