-
Notifications
You must be signed in to change notification settings - Fork 53
Export History job #494
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Export History job #494
Conversation
…urabletask-dotnet into wangbill/exportfinal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This pull request introduces comprehensive support for exporting orchestration history to Azure Blob Storage through a new ExportHistory library and sample web application. The changes enable users to create batch or continuous export jobs that export completed orchestration instances.
Key changes:
- Adds new
ExportHistorylibrary with entity-based job management, orchestrators for export operations, and Azure Blob Storage integration - Introduces
ExportHistoryWebAppsample with REST API endpoints for managing export jobs - Extends Durable Task client with history streaming and instance ID listing capabilities
- Includes comprehensive test coverage for all models, entities, and client operations
Reviewed Changes
Copilot reviewed 71 out of 71 changed files in this pull request and generated 20 comments.
Show a summary per file
| File | Description |
|---|---|
src/ExportHistory/* |
Core library implementation including entities, orchestrators, activities, models, and client abstractions |
test/ExportHistory.Tests/* |
Comprehensive unit tests for models, entities, orchestrations, and client operations |
src/Client/Core/DurableTaskClient.cs |
Adds ListInstanceIdsAsync and StreamInstanceHistoryAsync methods for export support |
src/Client/Grpc/GrpcDurableTaskClient.cs |
Implements gRPC-based history streaming and instance ID listing |
src/Grpc/orchestrator_service.proto |
Adds protobuf definitions for ListInstanceIds RPC endpoint |
samples/ExportHistoryWebApp/* |
Sample ASP.NET Core web app with REST API for export job management |
Microsoft.DurableTask.sln |
Updates solution to include new ExportHistory projects |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/ExportHistory/Orchestrations/ExecuteExportJobOperationOrchestrator.cs
Outdated
Show resolved
Hide resolved
test/ExportHistory.Tests/Client/DefaultExportHistoryJobClientTests.cs
Outdated
Show resolved
Hide resolved
halspang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't gotten through to the tests yet, but wanted to publish what I have so far.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 71 out of 71 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 71 out of 71 changed files in this pull request and generated 6 comments.
halspang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, things are looking good. Mostly some questions on intent behind why we did a few things/a few alterations to the orchestrations.
| /// <summary> | ||
| /// Represents a request to create a new export job. | ||
| /// </summary> | ||
| public class CreateExportJobRequest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason this is in the sample? Shouldn't we provide a standard request for export jobs in the SDK?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I now see that it's just for the controller, but I wonder if it's better to have the controller take the options? Just wondering what we want to encourage with users.
| try | ||
| { | ||
| ExportHistoryJobClient jobClient = this.exportHistoryClient.GetJobClient(id); | ||
| await jobClient.DeleteAsync(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From a user perspective, is this deleting an old job, or is this how you stop an ongoing job? If it's for stopping, we should update the API name. If it's strictly like a purge, we can keep it, but maybe use the same terminology from the existing APIs.
| /// <param name="lastInstanceKey">The last fetched instance key.</param> | ||
| /// <param name="cancellation">The cancellation token.</param> | ||
| /// <returns>A page of instance IDs with continuation token.</returns> | ||
| public virtual Task<Page<string>> ListInstanceIdsAsync( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this related to the PR? Also, if we're going to state in the comment that this is for listing with a filter, we should name it less generically than ListInstancesById.
| // Create conversion state to maintain orchestration instance across events | ||
| // This is required for entity-related events (EntityOperationCalled, EntityLockRequested, etc.) | ||
| // which need the parent orchestration instance information from ExecutionStartedEvent | ||
| Microsoft.DurableTask.ProtoUtils.EntityConversionState conversionState = new(insertMissingEntityUnlocks: false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are entities exported in the same way as an orchestration's history?
| try | ||
| { | ||
| moveNextResult = await responseStream.MoveNext(cancellation).ConfigureAwait(false); | ||
| } | ||
| catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled) | ||
| { | ||
| throw new OperationCanceledException( | ||
| $"The {nameof(this.StreamInstanceHistoryAsync)} operation was canceled.", e, cancellation); | ||
| } | ||
| catch (RpcException e) when (e.StatusCode == StatusCode.NotFound) | ||
| { | ||
| throw new ArgumentException($"An orchestration with the instanceId {instanceId} was not found.", e); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we generally throw the generic exception out? Or should we be wrapping it in something?
| /// <param name="lastInstanceKey">The last fetched instance key.</param> | ||
| /// <param name="cancellation">The cancellation token.</param> | ||
| /// <returns>A page of instance IDs with continuation token.</returns> | ||
| public virtual Task<Page<string>> ListInstanceIdsAsync( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of this generic name, if this is designed to always filter by completed time (as per the comment), we should update the method name to reflect that.
Or, if the comment is wrong, update that to have this be more generic. It feels more like a query than a list with all the filters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, how does this differ from GetAllInstancesAsync? Is it just the query/filter ability?
| /// </summary> | ||
| /// <param name="format">The export format.</param> | ||
| /// <returns>The file extension (e.g., "json", "jsonl.gz").</returns> | ||
| static string GetFileExtension(ExportFormat format) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we get a benefit from supporting two formats from the start? I would assume, to save costs, we'd default to the compressed format. Or, does that introduce an issue with things like Kusto ingestion?
Alternatively, are you trying to provide a lower compute by higher storage option?
| /// <summary> | ||
| /// Initializes a new instance of the <see cref="ExportJobCreationOptions"/> class. | ||
| /// </summary> | ||
| /// <param name="mode">The export mode (Batch or Continuous).</param> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought originally we were going to say a continuous one was just an export with no end time? Did we decide to have specific modes just for increased specificity?
| LastInstanceKey: currentState.Checkpoint?.LastInstanceKey, | ||
| MaxInstancesPerBatch: currentState.Config.MaxInstancesPerBatch); | ||
|
|
||
| InstancePage pageResult = await context.CallActivityAsync<InstancePage>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Late to the party, but is this why we do ListInstancesById instead of GetAllInstancesAsync? Because we want to do manual paging? If so, you can disregard my previous comment asking why we do it :)
| } | ||
|
|
||
| // Process batch with retry logic | ||
| BatchExportResult batchResult = await this.ProcessBatchWithRetryAsync( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should stuff like this be an activity for additional checkpointing? Same with the commit.
This pull request introduces a new sample web application,
ExportHistoryWebApp, for managing export history jobs via a REST API. The changes add new project files, configuration files, and controller logic, as well as update the solution file to include the new projects and their build configurations.New Export History Web Application
ExportHistoryWebAppto the solution, including its project file (ExportHistoryWebApp.csproj) and necessary references to Durable Task components and the newExportHistoryproject. [1] [2]ExportJobControllerto provide REST endpoints for creating, retrieving, listing, and deleting export history jobs. This controller uses the Durable Task ExportHistory client and models.CreateExportJobRequestmodel to represent the payload for creating export jobs, supporting various configuration options such as mode, time ranges, storage container, and filtering.Configuration and Environment Setup
launchSettings.json) and application configuration files (appsettings.json,appsettings.Development.json) to support local development and environment variable management for connection strings and storage settings. [1] [2] [3]Program.csto configure dependency injection, Durable Task client and worker services, and controller routing for the web application.API Usage and Documentation
ExportHistoryWebApp.http) with example requests for creating, listing, retrieving, and deleting export jobs, including tips and usage notes for common scenarios.Solution File Updates
Microsoft.DurableTask.slnto include the newExportHistoryandExportHistoryWebAppprojects, and adjusted build configuration and project nesting accordingly. [1] [2] [3] [4]