-
Notifications
You must be signed in to change notification settings - Fork 372
Onzia/azure durable functions #7535
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
7b5d236
567e285
2e426b9
5a4402e
b187a80
d87a3d3
75be9e8
34af747
3f0dfe4
7dae872
4532834
f251956
055c331
36a1d9b
a6654cd
38dad4c
e24b38b
33246ee
371aa84
02c07c4
cb7654b
06f902d
e547255
e7def9d
101cd10
3b44957
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| 'use strict' | ||
|
|
||
| const dc = require('dc-polyfill') | ||
| const shimmer = require('../../datadog-shimmer') | ||
|
|
||
| const { | ||
| addHook, | ||
| } = require('./helpers/instrument') | ||
|
|
||
| /** | ||
| * @type {import('diagnostics_channel').TracingChannel} | ||
| */ | ||
| const azureDurableFunctionsChannel = dc.tracingChannel('datadog:azure:durable-functions:invoke') | ||
|
|
||
| addHook({ name: 'durable-functions', versions: ['>=3'], patchDefault: false }, (df) => { | ||
| const { app } = df | ||
|
|
||
| shimmer.wrap(app, 'entity', entityWrapper) | ||
| shimmer.wrap(app, 'activity', activityHandler) | ||
|
|
||
| return df | ||
| }) | ||
|
|
||
| function entityWrapper (method) { | ||
| return function (entityName, arg) { | ||
| // because this method is overloaded, the second argument can either be an object | ||
| // with the handler or the handler itself, so first we figure which type it is | ||
|
|
||
| if (typeof arg === 'function') { | ||
| // if a function, this is the handler we want to wrap and trace | ||
| arguments[1] = shimmer.wrapFunction(arg, handler => entityHandler(handler, entityName, method.name)) | ||
| } else { | ||
| // if an object, access the handler then trace it | ||
| shimmer.wrap(arg, 'handler', handler => entityHandler(handler, entityName, method.name)) | ||
| } | ||
|
|
||
| return method.apply(this, arguments) | ||
| } | ||
| } | ||
|
|
||
| function entityHandler (handler, entityName, methodName) { | ||
| return function () { | ||
| const entityContext = arguments[0] | ||
| return azureDurableFunctionsChannel.traceSync( | ||
| handler, | ||
| { trigger: 'Entity', functionName: entityName, operationName: entityContext?.df?.operationName }, | ||
| this, ...arguments) | ||
| } | ||
| } | ||
|
|
||
| function activityHandler (method) { | ||
| return function (activityName, activityOptions) { | ||
| shimmer.wrap(activityOptions, 'handler', handler => { | ||
| const isAsync = | ||
| handler && handler.constructor && handler.constructor.name === 'AsyncFunction' | ||
|
|
||
| return function () { | ||
| // use tracePromise if this is an async handler. otherwise, use traceSync | ||
| return isAsync | ||
| ? azureDurableFunctionsChannel.tracePromise( | ||
| handler, | ||
| { trigger: 'Activity', functionName: activityName }, | ||
| this, ...arguments) | ||
| : azureDurableFunctionsChannel.traceSync( | ||
| handler, | ||
| { trigger: 'Activity', functionName: activityName }, | ||
| this, ...arguments) | ||
| } | ||
| }) | ||
| return method.apply(this, arguments) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,11 +6,16 @@ const { | |
| addHook, | ||
| } = require('./helpers/instrument') | ||
|
|
||
| let alreadyPatched = false | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does adding this functionality to
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm looking into it. Ideally patchDefault should prevent the patching of the defaultExport.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was not able to replicate it by running the client.spec.js in the http-test folder. I might need some more context on how the double patching was identified. Does removing that conditional make the tests fail?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pablo and I talked offline but I'm answering here for record. This is happening in the durable-functions tests not azure functions. Using esm imports produces an extra span. This is being addressed in #7601 |
||
|
|
||
| const azureFunctionsChannel = dc.tracingChannel('datadog:azure:functions:invoke') | ||
|
|
||
| addHook({ name: '@azure/functions', versions: ['>=4'], patchDefault: false }, (azureFunction) => { | ||
| const { app } = azureFunction | ||
|
|
||
| if (alreadyPatched) return azureFunction | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for some reason, when using esm imports, azure functions gets patched twice, resulting in extra spans. hence I added this. Why does this happen?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For ESM we always patch both the default export and the outer export. In some cases, the default export and the outer export end up triggering a patch on the same object or function. For example, we observed this behavior when a patch was applied to the prototype of an object that was shared by both the default export and the outer export.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I solved this in service bus by using a weakmap like this. I'm not sure if that's the approach we want to take here though.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Me and @pabloerhard talked and it seems this may be due to how durable-functions is importing @azure/functions under the hood using require. He's working on a fix in #7601 |
||
| alreadyPatched = true | ||
|
|
||
| // Http triggers | ||
| shimmer.wrap(app, 'deleteRequest', wrapHandler) | ||
| shimmer.wrap(app, 'http', wrapHandler) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| 'use strict' | ||
|
|
||
| const TracingPlugin = require('../../dd-trace/src/plugins/tracing') | ||
|
|
||
| class AzureDurableFunctionsPlugin extends TracingPlugin { | ||
| static get id () { return 'azure-durable-functions' } | ||
| static get operation () { return 'invoke' } | ||
| static get prefix () { return 'tracing:datadog:azure:durable-functions:invoke' } | ||
| static get type () { return 'serverless' } | ||
| static get kind () { return 'server' } | ||
|
|
||
| bindStart (ctx) { | ||
| const span = this.startSpan(this.operationName(), { | ||
| kind: 'internal', | ||
| type: 'serverless', | ||
|
|
||
| meta: { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure if you have this in a doc or not, but I'd double check we have all the metadata we need and compare it with a normal function. I imagine a lot of the Azure tags won't be necessary for durable functions, but we should make sure there is as much in common as possible. |
||
| component: 'azure-functions', | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The plugin starts spans for Useful? React with 👍 / 👎.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, are we considering these to be the same component as standard azure functions or should we make this have a more specific component tag for durable functions? |
||
| 'aas.function.name': ctx.functionName, | ||
| 'aas.function.trigger': ctx.trigger, | ||
| 'resource.name': `${ctx.trigger} ${ctx.functionName}`, | ||
| }, | ||
| }, ctx) | ||
|
|
||
| // in the case of entity functions, operationName should be available | ||
| if (ctx.operationName) { | ||
| span.setTag('aas.function.operation', ctx.operationName) | ||
| span.setTag('resource.name', `${ctx.trigger} ${ctx.functionName} ${ctx.operationName}` | ||
| ) | ||
| } | ||
|
|
||
| ctx.span = span | ||
| return ctx.currentStore | ||
| } | ||
|
|
||
| end (ctx) { | ||
| // We only want to run finish here if this is a synchronous operation | ||
| // Only synchronous operations would have `result` or `error` on `end` | ||
| // So we skip operations that dont | ||
| if (!ctx.hasOwnProperty('result') && !ctx.hasOwnProperty('error')) return | ||
| super.finish(ctx) | ||
| } | ||
|
|
||
| asyncEnd (ctx) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know it seems strange, but it is better to make this
|
||
| super.finish(ctx) | ||
| } | ||
| } | ||
|
|
||
| module.exports = AzureDurableFunctionsPlugin | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| { | ||
| "version": "2.0", | ||
| "extensionBundle": { | ||
| "id": "Microsoft.Azure.Functions.ExtensionBundle", | ||
| "version": "[4.0.0, 4.28.0)" | ||
| }, | ||
| "extensions": { | ||
| "durableTask": { | ||
| "storageProvider": { | ||
| "type": "AzureStorage" | ||
| } | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| { | ||
| "IsEncrypted": false, | ||
| "Values": { | ||
| "FUNCTIONS_WORKER_RUNTIME": "node", | ||
| "AzureWebJobsFeatureFlags": "EnableWorkerIndexing", | ||
| "AzureWebJobsStorage": "UseDevelopmentStorage=true" | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,9 @@ | ||
| { | ||
| "name": "azure-durable-functions-tests", | ||
| "version": "1.0.0", | ||
| "description": "", | ||
| "main": "./server.mjs", | ||
| "scripts": { | ||
| "start": "func start" | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,124 @@ | ||
| 'use strict' | ||
|
|
||
| const assert = require('node:assert/strict') | ||
|
|
||
| const { spawn } = require('child_process') | ||
| const { describe, it } = require('mocha') | ||
| const { | ||
| FakeAgent, | ||
| hookFile, | ||
| sandboxCwd, | ||
| useSandbox, | ||
| curlAndAssertMessage, | ||
| } = require('../../../../integration-tests/helpers') | ||
| const { withVersions } = require('../../../dd-trace/test/setup/mocha') | ||
|
|
||
| describe('esm', () => { | ||
| let agent | ||
| let proc | ||
|
|
||
| withVersions('azure-durable-functions', 'durable-functions', version => { | ||
| useSandbox([ | ||
| `durable-functions@${version}`, | ||
| '@azure/functions', | ||
| 'azure-functions-core-tools@4', | ||
| ], | ||
| false, | ||
| ['./packages/datadog-plugin-azure-durable-functions/test/integration-test/*', | ||
| './packages/datadog-plugin-azure-durable-functions/test/fixtures/*', | ||
| ]) | ||
|
|
||
| beforeEach(async () => { | ||
| agent = await new FakeAgent().start() | ||
| }) | ||
|
|
||
| afterEach(async () => { | ||
| // after each test, kill process and wait for exit before continuing | ||
| if (proc) { | ||
| proc.kill('SIGINT') | ||
| await new Promise(resolve => proc.on('exit', resolve)) | ||
| } | ||
| await agent.stop() | ||
| }) | ||
|
|
||
| it('is instrumented', async () => { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need a separate test for entity and non-entity functions or is this sufficient? I believe there is separate logic in the case of an entity function and we should probably capture that. |
||
| proc = await spawnPluginIntegrationTestProc(agent.port) | ||
| return await curlAndAssertMessage(agent, 'http://127.0.0.1:7071/api/httptest', ({ headers, payload }) => { | ||
| assert.strictEqual(headers.host, `127.0.0.1:${agent.port}`) | ||
| assert.ok(Array.isArray(payload)) | ||
|
|
||
| // should expect spans for http.request, activity.hola, entity.counter.add_n, entity.counter.get_count | ||
| assert.strictEqual(payload.length, 4) | ||
|
|
||
| for (const maybeArray of payload) { | ||
| assert.ok(Array.isArray(maybeArray)) | ||
| } | ||
|
|
||
| const [maybeHttpSpan, maybeHolaActivity, maybeAddNEntity, maybeGetCountEntity] = payload | ||
|
|
||
| assert.strictEqual(maybeHttpSpan.length, 2) | ||
| assert.strictEqual(maybeHttpSpan[0].resource, 'GET /api/httptest') | ||
|
|
||
| assert.strictEqual(maybeHolaActivity.length, 1) | ||
| assert.strictEqual(maybeHolaActivity[0].resource, 'Activity hola') | ||
| assert.strictEqual(maybeHolaActivity[0].name, 'azure.durable-functions.invoke') | ||
|
|
||
| assert.strictEqual(maybeAddNEntity.length, 1) | ||
| assert.strictEqual(maybeAddNEntity[0].resource, 'Entity Counter add_n') | ||
| assert.strictEqual(maybeAddNEntity[0].name, 'azure.durable-functions.invoke') | ||
|
|
||
| assert.strictEqual(maybeGetCountEntity.length, 1) | ||
| assert.strictEqual(maybeGetCountEntity[0].resource, 'Entity Counter get_count') | ||
| assert.strictEqual(maybeGetCountEntity[0].name, 'azure.durable-functions.invoke') | ||
| }) | ||
| }).timeout(60_000) | ||
| }) | ||
| }) | ||
|
|
||
| /** | ||
| * - spawns process for azure func start commands | ||
| * - connects to azurite (running in container) | ||
| * then runs the durable function locally | ||
| */ | ||
| async function spawnPluginIntegrationTestProc (agentPort) { | ||
| const cwd = sandboxCwd() | ||
| const env = { | ||
| NODE_OPTIONS: `--loader=${hookFile}`, | ||
| DD_TRACE_AGENT_PORT: agentPort, | ||
| DD_TRACE_DISABLED_PLUGINS: 'amqplib,amqp10,rhea,net', | ||
| PATH: `${cwd}/node_modules/azure-functions-core-tools/bin:${process.env.PATH}`, | ||
| } | ||
|
|
||
| const options = { cwd, env } | ||
|
|
||
| const proc = await spawnProc('func', ['start'], options) | ||
| return proc | ||
| } | ||
|
|
||
| function spawnProc (command, args, options = {}) { | ||
| const proc = spawn(command, args, { ...options, stdio: 'pipe' }) | ||
| return new Promise((resolve, reject) => { | ||
| proc | ||
| .on('error', reject) | ||
| .on('exit', code => { | ||
| if (code !== 0) { | ||
| reject(new Error(`Process exited with status code ${code}.`)) | ||
| } | ||
| resolve() | ||
| }) | ||
|
|
||
| proc.stdout.on('data', data => { | ||
| // eslint-disable-next-line no-console | ||
| if (!options.silent) console.log(data.toString()) | ||
|
|
||
| if (data.toString().includes('Host lock lease acquired by instance')) { | ||
| resolve(proc) | ||
| } | ||
| }) | ||
|
|
||
| proc.stderr.on('data', data => { | ||
| // eslint-disable-next-line no-console | ||
| if (!options.silent) console.error(data.toString()) | ||
| }) | ||
| }) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The activity wrapper chooses
traceSyncunlesshandler.constructor.name === 'AsyncFunction', which misses valid handlers that return a Promise but are not declared withasync(for example transpiled async functions or plain functions returningPromise). In that case the span is finished onendbefore the Promise settles, so latency and rejection errors are not captured for the activity execution.Useful? React with 👍 / 👎.