Dirigent is a deterministic workflow engine for executing YAML-based agent workflow DSLs on the JVM (Kotlin).
- Event-Driven Execution - Workflows triggered by Server-Sent Events (SSE) with automatic routing
- REST API + SSE Streaming - Query workflows/instances via HTTP, monitor execution in real-time
- Instance Tracking - Complete execution history with step timing, status, and error tracking
- Explicit State Management - State tracked as JSON objects, no hidden globals
- Bounded Execution - Safety limits prevent infinite loops (max 200 steps per workflow)
- Output Validation - LLM steps validate against declared schemas and custom expressions
- Hot Reload - Filesystem workflows automatically reload on changes
- Concurrent Execution - Configurable bounded concurrency for workflow execution
Runs API server + SSE consumer for production use:
# SSE_URL is a required positional argument
./gradlew run --args='http://localhost:8080/events/tasks'
# With filesystem workflows (hot reload enabled):
./gradlew run --args='http://localhost:8080/events/tasks --workflow-dir=/path/to/workflows'
# With custom API port and concurrency:
./gradlew run --args='http://localhost:8080/events/tasks --port=9090 --max-concurrent=20'Or with the standalone jar:
java -jar build/dirigent-0.1.0.jar http://localhost:8080/events/tasks --port=9090Features:
- REST API server for querying workflows and instances
- SSE consumer connects to external event source
- Event-driven workflow execution with trigger matching
- Real-time instance tracking and execution events
- Automatic reconnection with exponential backoff
- Last-Event-ID resumption support
- Graceful shutdown with workflow completion
Options:
SSE_URL(positional, required): SSE endpoint URL - e.g.,http://localhost:8080/events/tasks-p, --port INT: HTTP API server port (default: 8080)--workflow-dir PATH: Directory path for filesystem workflows (uses classpath/workflowsif not set)--max-concurrent INT: Max concurrent workflow executions (default: 10)
API Endpoints (available at http://localhost:8080/api/v1/):
GET /workflows- List all loaded workflows with metadataGET /workflows/{name}- Get workflow definition YAMLPOST /workflows/{name}/execute- Execute workflow with input data (returns 202 Accepted with instance ID)GET /instances- List workflow instances (with filtering/pagination)GET /instances/{id}- Get detailed instance execution stateGET /stats- Dispatcher statistics and metricsGET /events- SSE stream of workflow execution events
Runs a sample workflow once with hardcoded input (for testing):
./gradlew run --args='--demo'Uses classpath workflows and sample tools (stubs). No API server or SSE consumer.
Build a standalone jar with all dependencies:
make # Build jar (default target)
make dist # Same as aboveThis creates build/dirigent-<version>.jar which can be run with:
java -jar build/dirigent-0.1.0.jarOther make targets:
make test # Run all tests
make version # Display current version
make clean # Clean build artifactsOr use Gradle directly:
./gradlew build # Build the project
./gradlew test # Run all tests
./gradlew clean build # Clean buildllm- LLM call with structured output validationtool- Tool invocation (HTTP, DB, calculations, etc.)switch- Conditional branching using expressionsfail- Explicit workflow failure
- Explicit goto:
goto: step_id - Switch branching:
whenexpressions withgototargets anddefaultfallback - Termination:
end: trueorfailsteps - Error handling:
on_error: {goto: error_step}
name: triage_and_execute
version: 1
description: Classify and route incoming requests
# Event trigger (SSE mode)
triggers:
- type: task.created
when: "priority == 'high'"
start: classify
steps:
classify:
kind: llm
description: Classify user intent
tool: classify_input
out:
intent: string
confidence: number
validate:
- "confidence >= 0.7"
on_error:
goto: ask_clarification
route:
kind: switch
cases:
- when: "intent == 'do_task'"
goto: do_task
default: ask_clarification
do_task:
kind: tool
tool: run_task
args:
intent: "{{intent}}"
payload: "{{input.text}}"
end: true
ask_clarification:
kind: tool
tool: ask_user
args:
question: "Could you clarify?"
end: trueWorkflow Execution (com.ninjacontrol.dirigent):
Model.kt- DSL data model (Workflow, StepDef, Trigger)WorkflowExecutor.kt- Deterministic execution engineWorkflowValidator.kt- Pre-execution validationWorkflowLoader.kt- YAML parserExprEngine.kt- Sandboxed expression evaluatorTemplateEngine.kt-{{variable}}substitution
Tools (com.ninjacontrol.dirigent.tools):
Tool.kt- Functional interface:suspend fun(ObjectNode) -> ObjectNodeToolRegistry.kt- Allowlist-based registrationSampleTools.kt- Demo tools (classify, ask, execute)NotificationTools.kt- Logging tools (info, warn, error, debug)DebugTools.kt- Testing tools (delay, log_info, etc.)
Event-Driven Dispatch (com.ninjacontrol.dirigent.dispatch):
WorkflowDispatcher.kt- Event-to-workflow router with bounded concurrencyWorkflowRepository.kt- Workflow loading interfaceClasspathWorkflowRepository.kt- Classpath loaderFilesystemWorkflowRepository.kt- Filesystem loader with hot reload
Events (com.ninjacontrol.dirigent.events):
Event.kt- Generic event data modelEventSource.kt- Pluggable event source interfaceEventBus.kt- In-memory pub-sub busEventRouter.kt- Multi-source event router with error isolationSseEventSource.kt- SSE client with resilience and configurable filteringSseParser.kt- SSE protocol parser
REST API (com.ninjacontrol.dirigent.api):
ApiServer.kt- Ktor-based HTTP serverApiModels.kt- Request/response DTOsroutes/- API endpoint handlers (workflows, instances, stats, events)
Instance Tracking (com.ninjacontrol.dirigent.instance):
WorkflowInstance.kt- Instance data model and status enumWorkflowInstanceRegistry.kt- In-memory instance storage with TTLWorkflowExecutionEvent.kt- Execution event hierarchy (started, step completed, etc.)WorkflowExecutionEventBus.kt- Real-time event streaming with filtering
From AGENTS.md:
Non-negotiable:
- Deterministic control flow (no LLM-driven branching)
- Explicit state (no hidden globals)
- Bounded execution (finite workflows)
- Minimal surface area (intentionally limited DSL)
Prohibited:
- Dynamic code execution
- LLM output determining step order
- Unbounded loops or recursion
- Magic defaults or implicit behavior
src/main/kotlin/com/ninjacontrol/dirigent/
├── Model.kt # DSL data model
├── WorkflowExecutor.kt # Execution engine
├── WorkflowValidator.kt # Validation logic
├── WorkflowLoader.kt # YAML parser
├── ExprEngine.kt # Expression evaluator
├── TemplateEngine.kt # Template substitution
├── Main.kt # Application entry point
├── tools/
│ ├── Tool.kt # Tool interface
│ ├── ToolRegistry.kt # Tool registration
│ ├── SampleTools.kt # Demo tools
│ ├── NotificationTools.kt # Logging tools
│ └── DebugTools.kt # Testing tools
├── dispatch/
│ ├── WorkflowDispatcher.kt # Event router
│ └── WorkflowRepository.kt # Workflow loading
├── events/
│ ├── Event.kt # Event model
│ ├── EventSource.kt # Event source interface
│ ├── EventBus.kt # Event bus
│ ├── EventRouter.kt # Multi-source router
│ └── SseEventSource.kt # SSE client
├── api/
│ ├── ApiServer.kt # HTTP server
│ ├── ApiModels.kt # DTOs
│ └── routes/ # API endpoints
└── instance/
├── WorkflowInstance.kt # Instance model
├── WorkflowInstanceRegistry.kt
├── WorkflowExecutionEvent.kt
└── WorkflowExecutionEventBus.kt
src/main/resources/workflows/
├── sample.yaml # Demo workflow
├── on_task_created_security.yaml
└── on_task_status_ongoing.yaml
src/test/kotlin/com/ninjacontrol/dirigent/
├── WorkflowExecutorTest.kt
├── ExprEngineTest.kt
├── dispatch/
│ └── WorkflowDispatcherTest.kt
└── api/
└── ApiServerTest.kt
Small, sandboxed evaluator for branching and validation:
Supported:
- Literals: strings, numbers, booleans, null
- Variables: dotted paths (
confidence,input.text) - Boolean operators:
&&,||,! - Comparisons:
==,!=,<,<=,>,>=
Intentionally limited:
- No function calls
- No dynamic evaluation
- No arithmetic operators (yet)
Examples:
validate:
- "confidence >= 0.7"
- "status != null"
when: "priority == 'high' && confidence > 0.8"For SSE mode:
Create YAML file in your workflow directory:
name: my_workflow
version: 1
description: What this workflow does
triggers:
- type: event.type
when: "data.field == 'value'"
start: first_step
steps:
# ... step definitionsWorkflows automatically reload when files change (filesystem mode only).
For demo mode:
Add workflow to src/main/resources/workflows/ and update Main.kt.
Edit SampleTools.kt or create new tool files:
suspend fun myTool(input: ObjectNode): ObjectNode {
val value = input.get("param").asText()
// ... tool logic
return objectMapper.createObjectNode().apply {
put("result", value)
}
}
// Register in registry
fun registry(): ToolRegistry = ToolRegistry(
mapOf(
"my_tool" to ::myTool,
// ... other tools
)
)Tools must:
- Accept
ObjectNodeinput - Return
ObjectNodeoutput - Be suspending functions (for async operations)
make test # Run all tests
./gradlew test --info # Detailed output
./gradlew test --tests WorkflowExecutorTest # Specific test- Language: Kotlin 2.3.0 (JVM 17)
- Build: Gradle 8.13 with Shadow plugin
- Web Framework: Ktor (HTTP server + SSE client)
- Dependencies:
- Jackson (YAML/JSON parsing and serialization)
- Kotlinx Coroutines (async execution and structured concurrency)
- SLF4J + Logback (logging with MDC support)
- Testing: JUnit 5, Kotest, coroutines-test
MIT