Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
addHook,
channel,
} = require('./helpers/instrument')
const { getKafkaClusterId, isPromise } = require('./helpers/kafka')

// Create channels for Confluent Kafka JavaScript
const channels = {
Expand Down Expand Up @@ -195,6 +196,8 @@

// Wrap the producer method if it exists
if (typeof kafka?.producer === 'function') {
const kafkaProducerClusterIdPromise = getKafkaClusterId(kafka)

shimmer.wrap(kafka, 'producer', function wrapProducerMethod (producerMethod) {
return function wrappedProducerMethod () {
const producer = producerMethod.apply(this, arguments)
Expand All @@ -211,48 +214,58 @@
return send.apply(this, arguments)
}

const ctx = {
topic: payload?.topic,
messages: payload?.messages || [],
bootstrapServers: kafka._ddBrokers,
disableHeaderInjection: disabledHeaderWeakSet.has(producer),
}

return channels.producerStart.runStores(ctx, () => {
try {
const result = send.apply(this, arguments)
const wrappedSendWithClusterId = (clusterId) => {
const ctx = {
topic: payload?.topic,
messages: payload?.messages || [],
bootstrapServers: kafka._ddBrokers,
disableHeaderInjection: disabledHeaderWeakSet.has(producer),
clusterId,
}

result.then((res) => {
ctx.result = res
channels.producerCommit.publish(ctx)
channels.producerFinish.publish(ctx)
}, (err) => {
if (err) {
// Fixes bug where we would inject message headers for kafka brokers
// that don't support headers (version <0.11). On the error, we disable
// header injection. Tnfortunately the error name / type is not more specific.
// This approach is implemented by other tracers as well.
if (err.name === 'KafkaJSError' && err.type === 'ERR_UNKNOWN') {
disabledHeaderWeakSet.add(producer)
log.error(
// eslint-disable-next-line @stylistic/max-len
'Kafka Broker responded with UNKNOWN_SERVER_ERROR (-1). Please look at broker logs for more information. Tracer message header injection for Kafka is disabled.'
)
return channels.producerStart.runStores(ctx, () => {
try {
const result = send.apply(this, arguments)

result.then((res) => {
ctx.result = res
channels.producerCommit.publish(ctx)
channels.producerFinish.publish(ctx)
}, (err) => {
if (err) {
// Fixes bug where we would inject message headers for kafka brokers
// that don't support headers (version <0.11). On the error, we disable
// header injection. Tnfortunately the error name / type is not more specific.
// This approach is implemented by other tracers as well.
if (err.name === 'KafkaJSError' && err.type === 'ERR_UNKNOWN') {
disabledHeaderWeakSet.add(producer)
log.error(
// eslint-disable-next-line @stylistic/max-len
'Kafka Broker responded with UNKNOWN_SERVER_ERROR (-1). Please look at broker logs for more information. Tracer message header injection for Kafka is disabled.'
)
}
ctx.error = err
channels.producerError.publish(ctx)
}
ctx.error = err
channels.producerError.publish(ctx)
}
channels.producerFinish.publish(ctx)
})

return result
} catch (e) {
ctx.error = e
channels.producerError.publish(ctx)
channels.producerFinish.publish(ctx)
})

return result
} catch (e) {
ctx.error = e
channels.producerError.publish(ctx)
channels.producerFinish.publish(ctx)
throw e
}
})
throw e
}
})
}

if (isPromise(kafkaProducerClusterIdPromise)) {
return kafkaProducerClusterIdPromise.then((clusterId) => {
return wrappedSendWithClusterId(clusterId)
})
}
return wrappedSendWithClusterId(kafkaProducerClusterIdPromise)
}
})
}
Expand All @@ -264,10 +277,13 @@

// Wrap the consumer method if it exists
if (typeof kafka?.consumer === 'function') {
const kafkaConsumerClusterIdPromise = getKafkaClusterId(kafka)

shimmer.wrap(kafka, 'consumer', function wrapConsumerMethod (consumerMethod) {
return function wrappedConsumerMethod (config) {
const consumer = consumerMethod.apply(this, arguments)
const groupId = getGroupId(config)
let resolvedClusterId = null

Check failure on line 286 in packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js

View workflow job for this annotation

GitHub Actions / lint

'resolvedClusterId' is assigned a value but never used

// Wrap the run method for handling message consumption
if (typeof consumer?.run === 'function') {
Expand All @@ -277,49 +293,62 @@
return run.apply(this, arguments)
}

const eachMessage = options?.eachMessage
const eachBatch = options?.eachBatch
if (eachMessage) {
options.eachMessage = wrapKafkaCallback(
eachMessage,
{
startCh: channels.consumerStart,
commitCh: channels.consumerCommit,
finishCh: channels.consumerFinish,
errorCh: channels.consumerError,
},
(payload) => {
return {
topic: payload?.topic,
partition: payload?.partition,
offset: payload?.message?.offset,
message: payload?.message,
groupId,
}
})
} else if (eachBatch) {
options.eachBatch = wrapKafkaCallback(
eachBatch,
{
startCh: channels.batchConsumerStart,
commitCh: channels.batchConsumerCommit,
finishCh: channels.batchConsumerFinish,
errorCh: channels.batchConsumerError,
},
(payload) => {
const { batch } = payload
return {
topic: batch?.topic,
partition: batch?.partition,
offset: batch?.messages[batch?.messages?.length - 1]?.offset,
messages: batch?.messages,
groupId,
const wrapConsume = (clusterId) => {
resolvedClusterId = clusterId

const eachMessage = options?.eachMessage
const eachBatch = options?.eachBatch
if (eachMessage) {
options.eachMessage = wrapKafkaCallback(
eachMessage,
{
startCh: channels.consumerStart,
commitCh: channels.consumerCommit,
finishCh: channels.consumerFinish,
errorCh: channels.consumerError,
},
(payload) => {
return {
topic: payload?.topic,
partition: payload?.partition,
offset: payload?.message?.offset,
message: payload?.message,
groupId,
clusterId,
}
})
} else if (eachBatch) {
options.eachBatch = wrapKafkaCallback(
eachBatch,
{
startCh: channels.batchConsumerStart,
commitCh: channels.batchConsumerCommit,
finishCh: channels.batchConsumerFinish,
errorCh: channels.batchConsumerError,
},
(payload) => {
const { batch } = payload
return {
topic: batch?.topic,
partition: batch?.partition,
offset: batch?.messages[batch?.messages?.length - 1]?.offset,
messages: batch?.messages,
groupId,
clusterId,
}
}
}
)
)
}

return run.apply(this, arguments)
}

return run.apply(this, arguments)
if (isPromise(kafkaConsumerClusterIdPromise)) {
return kafkaConsumerClusterIdPromise.then((clusterId) => {
return wrapConsume(clusterId)
})
}
return wrapConsume(kafkaConsumerClusterIdPromise)
}
})
}
Expand Down Expand Up @@ -362,7 +391,10 @@
}

return startCh.runStores(ctx, () => {
updateLatestOffset(commitPayload?.topic, commitPayload?.partition, commitPayload?.offset, commitPayload?.groupId)
updateLatestOffset(
commitPayload?.topic, commitPayload?.partition, commitPayload?.offset,
commitPayload?.groupId, commitPayload?.clusterId
)

try {
const result = callback.apply(this, arguments)
Expand Down Expand Up @@ -401,13 +433,14 @@
return ''
}

function updateLatestOffset (topic, partition, offset, groupId) {
const key = `${topic}:${partition}`
function updateLatestOffset (topic, partition, offset, groupId, clusterId) {
const key = `${topic}:${partition}:${clusterId || ''}`
latestConsumerOffsets.set(key, {
topic,
partition,
offset,
groupId,
clusterId,
})
}

Expand Down
47 changes: 47 additions & 0 deletions packages/datadog-instrumentations/src/helpers/kafka.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
'use strict'

/**
* Retrieve the Kafka cluster ID from the admin API.
* Returns a cached value, a promise, or null if unavailable.
*
* @param {object} kafka - KafkaJS-compatible Kafka instance
* @returns {string|Promise<string>|null}
*/
function getKafkaClusterId (kafka) {
if (kafka._ddKafkaClusterId) {
return kafka._ddKafkaClusterId
}

if (!kafka.admin) {
return null
}

const admin = kafka.admin()

if (!admin.describeCluster) {
return null
}

return admin.connect()
.then(() => {
return admin.describeCluster()
})
.then((clusterInfo) => {
const clusterId = clusterInfo?.clusterId
kafka._ddKafkaClusterId = clusterId
admin.disconnect()
return clusterId
})
.catch((error) => {
throw error
})
}

function isPromise (obj) {
return !!obj && (typeof obj === 'object' || typeof obj === 'function') && typeof obj.then === 'function'
}

module.exports = {
getKafkaClusterId,
isPromise,
}
34 changes: 1 addition & 33 deletions packages/datadog-instrumentations/src/kafkajs.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
channel,
addHook,
} = require('./helpers/instrument')
const { getKafkaClusterId, isPromise } = require('./helpers/kafka')

const producerStartCh = channel('apm:kafkajs:produce:start')
const producerCommitCh = channel('apm:kafkajs:produce:commit')
Expand Down Expand Up @@ -228,37 +229,4 @@
}
: fn
}

Check failure on line 232 in packages/datadog-instrumentations/src/kafkajs.js

View workflow job for this annotation

GitHub Actions / lint

Too many blank lines at the end of file. Max of 0 allowed
const getKafkaClusterId = (kafka) => {
if (kafka._ddKafkaClusterId) {
return kafka._ddKafkaClusterId
}

if (!kafka.admin) {
return null
}

const admin = kafka.admin()

if (!admin.describeCluster) {
return null
}

return admin.connect()
.then(() => {
return admin.describeCluster()
})
.then((clusterInfo) => {
const clusterId = clusterInfo?.clusterId
kafka._ddKafkaClusterId = clusterId
admin.disconnect()
return clusterId
})
.catch((error) => {
throw error
})
}

function isPromise (obj) {
return !!obj && (typeof obj === 'object' || typeof obj === 'function') && typeof obj.then === 'function'
}
Loading