Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 54 additions & 90 deletions packages/datadog-instrumentations/src/ai.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,11 @@

const { channel, tracingChannel } = require('dc-polyfill')
const shimmer = require('../../datadog-shimmer')
const { addHook } = require('./helpers/instrument')

const toolCreationChannel = channel('dd-trace:vercel-ai:tool')

const TRACED_FUNCTIONS = {
generateText: wrapWithTracer,
streamText: wrapWithTracer,
generateObject: wrapWithTracer,
streamObject: wrapWithTracer,
embed: wrapWithTracer,
embedMany: wrapWithTracer,
tool: wrapTool,
}
const { addHook, getHooks } = require('./helpers/instrument')

const vercelAiTracingChannel = tracingChannel('dd-trace:vercel-ai')
const vercelAiSpanSetAttributesChannel = channel('dd-trace:vercel-ai:span:setAttributes')

const noopTracer = {
startActiveSpan () {
const fn = arguments[arguments.length - 1]

const span = {
spanContext () { return { traceId: '', spanId: '', traceFlags: 0 } },
setAttribute () { return this },
setAttributes () { return this },
addEvent () { return this },
addLink () { return this },
addLinks () { return this },
setStatus () { return this },
updateName () { return this },
end () { return this },
isRecording () { return false },
recordException () { return this },
}

return fn(span)
},
}

const tracers = new WeakSet()

function wrapTracer (tracer) {
Expand All @@ -63,29 +29,35 @@ function wrapTracer (tracer) {

arguments[arguments.length - 1] = shimmer.wrapFunction(cb, function (originalCb) {
return function (span) {
shimmer.wrap(span, 'end', function (spanEnd) {
// the below is necessary in the case that the span is vercel ai's noopSpan.
// while we don't want to patch the noopSpan more than once, we do want to treat each as a
// fresh instance. However, this is really not necessary for non-noop spans, but not sure
// how to differentiate.
const freshSpan = Object.create(span) // TODO: does this cause memory leaks?
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see note here - not sure if we'd really leak here since we should be 1:1 with already-created spans that will be GC'd anyways, so this should be as well.

i also thought about trying to detect the no-op span via a WeakSet, potentially with just one entry, as the only time we should see the same span reference here is that case, and then do this cloning if so. Otherwise, if from a custom or real OpenTelemetry tracer, all spans should be unique.


shimmer.wrap(freshSpan, 'end', function (spanEnd) {
return function () {
vercelAiTracingChannel.asyncEnd.publish(ctx)
return spanEnd.apply(this, arguments)
}
})

shimmer.wrap(span, 'setAttributes', function (setAttributes) {
shimmer.wrap(freshSpan, 'setAttributes', function (setAttributes) {
return function (attributes) {
vercelAiSpanSetAttributesChannel.publish({ ctx, attributes })
return setAttributes.apply(this, arguments)
}
})

shimmer.wrap(span, 'recordException', function (recordException) {
shimmer.wrap(freshSpan, 'recordException', function (recordException) {
return function (exception) {
ctx.error = exception
vercelAiTracingChannel.error.publish(ctx)
return recordException.apply(this, arguments)
}
})

return originalCb.apply(this, arguments)
return originalCb.call(this, freshSpan)
}
})

Expand All @@ -98,58 +70,50 @@ function wrapTracer (tracer) {
})
}

function wrapWithTracer (fn) {
return function () {
const options = arguments[0]

const experimentalTelemetry = options.experimental_telemetry
if (experimentalTelemetry?.isEnabled === false) {
return fn.apply(this, arguments)
}

if (experimentalTelemetry == null) {
options.experimental_telemetry = { isEnabled: true, tracer: noopTracer }
} else {
experimentalTelemetry.isEnabled = true
experimentalTelemetry.tracer ??= noopTracer
}

wrapTracer(options.experimental_telemetry.tracer)

return fn.apply(this, arguments)
for (const hook of getHooks('ai')) {
if (hook.file === 'dist/index.js') {
// if not removed, the below hook will never match correctly
// however, it is still needed in the orchestrion definition
hook.file = null
}
}

function wrapTool (tool) {
return function () {
const args = arguments[0]
toolCreationChannel.publish(args)

return tool.apply(this, arguments)
}
}

// CJS exports
addHook({
name: 'ai',
versions: ['>=4.0.0'],
}, exports => {
for (const [fnName, patchingFn] of Object.entries(TRACED_FUNCTIONS)) {
exports = shimmer.wrap(exports, fnName, patchingFn, { replaceGetter: true })
}
addHook(hook, exports => {
const getTracerChannel = tracingChannel('orchestrion:ai:getTracer')
getTracerChannel.subscribe({
end (ctx) {
const { arguments: args, result: tracer } = ctx
const { isEnabled } = args[0] ?? {}

return exports
})

// ESM exports
addHook({
name: 'ai',
versions: ['>=4.0.0'],
file: 'dist/index.mjs',
}, exports => {
for (const [fnName, patchingFn] of Object.entries(TRACED_FUNCTIONS)) {
exports = shimmer.wrap(exports, fnName, patchingFn, { replaceGetter: true })
}
if (isEnabled !== false) {
wrapTracer(tracer)
}
},
})

/**
* We patch this function to ensure that the telemetry attributes/tags are set always,
* even when telemetry options are not specified. This is to ensure easy use of this integration.
*
* If it is explicitly disabled, however, we will not change the options.
*/
const selectTelemetryAttributesChannel = tracingChannel('orchestrion:ai:selectTelemetryAttributes')
selectTelemetryAttributesChannel.subscribe({
start (ctx) {
const { arguments: args } = ctx
const options = args[0]

if (options.telemetry?.isEnabled !== false) {
args[0] = {
...options,
telemetry: {
...options.telemetry,
isEnabled: true,
},
}
}
},
})

return exports
})
return exports
})
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before, it seems we didn't support different ranges in the instrumentation definition file. i think i could have also just put the instrumentation definitions in reverse order (>=6.0.0, then >=4.0.0 <6.0.0, whereas now it is flipped, although that's an implementation detail that could be hard to discover, especially if supporting newer versions later and just appending them after existing entries.

happy to do this however, though

Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const instrumentations = require('./instrumentations')

const NODE_OPTIONS = getEnvironmentVariable('NODE_OPTIONS')

/** @type {Record<string, Set<string>>} map of module base name to supported function query versions */
const supported = {}
const disabled = new Set()

Expand Down Expand Up @@ -104,19 +105,21 @@ function disable (instrumentation) {
function satisfies (filename, filePath, versions) {
const [basename] = filename.split(filePath)

if (supported[basename] === undefined) {
supported[basename] ??= new Set()

if (!supported[basename].has(versions)) {
try {
const pkg = JSON.parse(readFileSync(
join(basename, 'package.json'), 'utf8'
))

supported[basename] = semifies(pkg.version, versions)
} catch {
supported[basename] = false
}
if (semifies(pkg.version, versions)) {
supported[basename].add(versions)
}
} catch {}
}

return supported[basename]
return supported[basename].has(versions)
}

// TODO: Support index
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
'use strict'

module.exports = [
// getTracer - for patching tracer
{
module: {
name: 'ai',
versionRange: '>=4.0.0',
filePath: 'dist/index.js',
},
functionQuery: {
functionName: 'getTracer',
kind: 'Sync',
},
channelName: 'getTracer',
},
{
module: {
name: 'ai',
versionRange: '>=4.0.0',
filePath: 'dist/index.mjs',
},
functionQuery: {
functionName: 'getTracer',
kind: 'Sync',
},
channelName: 'getTracer',
},
// selectTelemetryAttributes - makes sure we set isEnabled properly
{
module: {
name: 'ai',
versionRange: '>=4.0.0 <6.0.0',
filePath: 'dist/index.js',
},
functionQuery: {
functionName: 'selectTelemetryAttributes',
kind: 'Sync',
},
channelName: 'selectTelemetryAttributes',
},
{
module: {
name: 'ai',
versionRange: '>=4.0.0 <6.0.0',
filePath: 'dist/index.mjs',
},
functionQuery: {
functionName: 'selectTelemetryAttributes',
kind: 'Sync',
},
channelName: 'selectTelemetryAttributes',
},
{
module: {
name: 'ai',
versionRange: '>=6.0.0',
filePath: 'dist/index.js',
},
functionQuery: {
functionName: 'selectTelemetryAttributes',
kind: 'Async',
},
channelName: 'selectTelemetryAttributes',
},
{
module: {
name: 'ai',
versionRange: '>=6.0.0',
filePath: 'dist/index.mjs',
},
functionQuery: {
functionName: 'selectTelemetryAttributes',
kind: 'Async',
},
channelName: 'selectTelemetryAttributes',
},
// tool
{
module: {
name: 'ai',
versionRange: '>=4.0.0',
filePath: 'dist/index.js',
},
functionQuery: {
functionName: 'tool',
kind: 'Sync',
},
channelName: 'tool',
},
{
module: {
name: 'ai',
versionRange: '>=4.0.0',
filePath: 'dist/index.mjs',
},
functionQuery: {
functionName: 'tool',
kind: 'Sync',
},
channelName: 'tool',
},
]
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict'

module.exports = [
...require('./langchain'),
...require('./ai'),
...require('./bullmq'),
...require('./langchain'),
]
10 changes: 7 additions & 3 deletions packages/datadog-plugin-ai/test/index.spec.js
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertions removed from this file are due to not actually adding a tracer/modifying isEnabled on the options specific to each ai operation, but patching instead from getTracer

Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ const { NODE_MAJOR } = require('../../../version')
const range = NODE_MAJOR < 22 ? '>=4.0.2' : '>=4.0.0'

function getAiSdkOpenAiPackage (vercelAiVersion) {
return semifies(vercelAiVersion, '>=5.0.0') ? '@ai-sdk/openai' : '@ai-sdk/openai@1.3.23'
if (semifies(vercelAiVersion, '>=6.0.0')) {
return '@ai-sdk/openai'
} else if (semifies(vercelAiVersion, '>=5.0.0')) {
return '@ai-sdk/openai@2.0.0'
} else {
return '@ai-sdk/openai@1.3.23'
}
}

// making a different reference from the default no-op tracer in the instrumentation
Expand Down Expand Up @@ -116,7 +122,6 @@ describe('Plugin', () => {
})

assert.ok(result.text, 'Expected result to be truthy')
assert.ok(experimentalTelemetry.tracer != null, 'Tracer should be set when `isEnabled` is true')

await checkTraces
})
Expand Down Expand Up @@ -157,7 +162,6 @@ describe('Plugin', () => {
})

assert.ok(result.text, 'Expected result to be truthy')
assert.ok(experimentalTelemetry.isEnabled, 'isEnabled should be set to true')
assert.ok(experimentalTelemetry.tracer === myTracer, 'Tracer should be set when `isEnabled` is true')

await checkTraces
Expand Down
8 changes: 5 additions & 3 deletions packages/dd-trace/src/llmobs/plugins/ai/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const { channel } = require('dc-polyfill')
const BaseLLMObsPlugin = require('../base')
const { getModelProvider } = require('../../../../../datadog-plugin-ai/src/utils')

const toolCreationCh = channel('dd-trace:vercel-ai:tool')
const toolCreationCh = channel('tracing:orchestrion:ai:tool:start')
const setAttributesCh = channel('dd-trace:vercel-ai:span:setAttributes')

const { MODEL_NAME, MODEL_PROVIDER, NAME } = require('../../constants/tags')
Expand Down Expand Up @@ -94,8 +94,10 @@ class VercelAILLMObsPlugin extends BaseLLMObsPlugin {

this.#toolCallIdsToName = {}
this.#availableTools = new Set()
toolCreationCh.subscribe(toolArgs => {
this.#availableTools.add(toolArgs)
toolCreationCh.subscribe(ctx => {
const toolArgs = ctx.arguments
const tool = toolArgs[0] ?? {}
this.#availableTools.add(tool)
})

setAttributesCh.subscribe(({ ctx, attributes }) => {
Expand Down
Loading
Loading