🚀 DialogChain is a powerful and extensible framework for building, managing, and deploying dialog systems and conversational AI applications. It supports multiple programming languages and integrates with various NLP and ML models.
- Multi-language Support: Write processors in Python, JavaScript, or any language with gRPC support
- Extensible Architecture: Easily add new input sources, processors, and output destinations
- Asynchronous Processing: Built on asyncio for high-performance dialog processing
- YAML Configuration: Define dialog flows and processing pipelines with simple YAML files
- Built-in Processors: Includes common NLP and ML model integrations
- Monitoring & Logging: Comprehensive logging and metrics out of the box
- REST & gRPC APIs: Easy integration with other services
- Docker Support: Containerized deployment options
- Python 3.8+
- Poetry (for development)
- Docker (optional, for containerized deployment)
pip install dialogchaingit clone https://github.com/dialogchain/python
cd python
poetry install- Create a simple dialog configuration in
config.yaml:
version: "1.0"
pipeline:
- name: greeting
type: python
module: dialogchain.processors.basic
class: GreetingProcessor
config:
default_name: "User"- Run the dialog server:
dialogchain serve config.yaml- Send a request:
curl -X POST http://localhost:8000/process -H "Content-Type: application/json" -d '{"text": "Hello!"}'For detailed documentation, please visit our documentation site.
DialogChain includes a robust logging system with the following features:
- Multiple Handlers: Console and file logging out of the box
- Structured Logs: JSON-formatted logs for easy parsing
- SQLite Storage: Logs are stored in a searchable database
- Log Rotation: Automatic log rotation to prevent disk space issues
- Thread-Safe: Safe for use in multi-threaded applications
from dialogchain.utils.logger import setup_logger, get_logs
# Get a logger instance
logger = setup_logger(__name__, log_level='DEBUG')
# Log messages with different levels
logger.debug('Debug message')
logger.info('Information message')
logger.warning('Warning message')
logger.error('Error message', extra={'error_code': 500})
# Get recent logs from database
recent_logs = get_logs(limit=10)DialogChain provides several make commands for log management:
# View recent logs (default: 50 lines)
make logs
# View specific number of log lines
make logs LINES=100
# Set log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
make log-level LEVEL=DEBUG
# View database logs
make log-db LIMIT=50
# Follow log file in real-time
make log-tail
# Clear log files
make log-clearLogging can be configured via environment variables:
# Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
LOG_LEVEL=DEBUG
# Log file path
LOG_FILE=logs/dialogchain.log
# Database log file path
DB_LOG_FILE=logs/dialogchain.dbLog files are automatically rotated when they reach 10MB, keeping up to 5 backup files.
dialogchain/
├── src/
│ └── dialogchain/
│ ├── __init__.py
│ ├── engine.py # Core processing engine
│ ├── processors/ # Built-in processors
│ ├── connectors/ # I/O connectors
│ └── utils/ # Utility functions
├── tests/ # Test suite
├── examples/ # Example configurations
└── docs/ # Documentation
Run the complete test suite:
make testRun specific test types:
# Unit tests
make test-unit
# Integration tests
make test-integration
# End-to-end tests
make test-e2eGenerate test coverage report:
make coverageWe welcome contributions! Please see our Contributing Guidelines for details.
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.
For support, please open an issue in the GitHub repository.
DialogChain includes a comprehensive test suite to ensure code quality and functionality. Here's how to run the tests and view logs:
Run the complete test suite:
make testOr run specific test types:
# Unit tests
make test-unit
# Integration tests
make test-integration
# End-to-end tests
make test-e2eGenerate a coverage report to see which parts of your code are being tested:
make coverageThis will generate an HTML report in the htmlcov directory.
View the most recent logs from your application:
# Show last 50 lines from all log files
make logs
# Show a different number of lines
make logs LINES=100
# Specify a custom log directory
make logs LOG_DIR=/path/to/logsEnsure your code follows the project's style guidelines:
# Run linters
make lint
# Automatically format your code
make format
# Check types
make typecheckYou can also run tests in a Docker container:
# Build the Docker image
docker build -t dialogchain .
# Run tests in the container
docker run --rm dialogchain make testDialogChain includes powerful network scanning capabilities to discover devices like cameras and printers on your local network.
Scan your local network for various devices and services:
make scan-networkFind RTSP cameras on your network:
make scan-camerasList all available printers on your system:
make scan-printersSend a test page to your default printer:
make print-testYou can also use the network scanner directly in your Python code:
from dialogchain.scanner import NetworkScanner
import asyncio
async def scan_network():
scanner = NetworkScanner()
# Scan for all services
services = await scanner.scan_network()
# Or scan for specific service types
cameras = await scanner.scan_network(service_types=['rtsp'])
for service in services:
print(f"{service.ip}:{service.port} - {service.service} ({service.banner})")
# Run the scan
asyncio.run(scan_network())DialogChain includes basic printing capabilities using the CUPS (Common Unix Printing System) interface.
import cups
def print_text(text, printer_name=None):
conn = cups.Connection()
printers = conn.getPrinters()
if not printers:
print("No printers available")
return
printer = printer_name or list(printers.keys())[0]
job_id = conn.printFile(printer, "/dev/stdin", "DialogChain Print", {"raw": "True"}, text)
print(f"Sent print job {job_id} to {printer}")
# Example usage
print_text("Hello from DialogChain!")def print_file(file_path, printer_name=None):
conn = cups.Connection()
printers = conn.getPrinters()
if not printers:
print("No printers available")
return
printer = printer_name or list(printers.keys())[0]
job_id = conn.printFile(printer, file_path, "Document Print", {})
print(f"Sent print job {job_id} to {printer}")
# Example usage
print_file("document.pdf")- Python 3.8+
- Poetry
-
Clone the repository:
git clone https://github.com/dialogchain/python.git cd python -
Install dependencies:
poetry install
-
Activate the virtual environment:
poetry shell
-
Install development and test dependencies:
poetry install --with dev,test
-
Set up pre-commit hooks:
pre-commit install
-
Run tests:
make testOr with coverage report:
make coverage
dialogchain/
├── src/
│ └── dialogchain/ # Main package
│ ├── __init__.py
│ ├── cli.py # Command-line interface
│ ├── config.py # Configuration handling
│ ├── connectors/ # Connector implementations
│ ├── engine.py # Core engine
│ ├── exceptions.py # Custom exceptions
│ ├── processors/ # Processor implementations
│ └── utils.py # Utility functions
├── tests/ # Test files
│ ├── unit/ # Unit tests
│ │ ├── core/ # Core functionality tests
│ │ ├── connectors/ # Connector tests
│ │ └── ...
│ └── integration/ # Integration tests
├── .github/ # GitHub workflows
├── docs/ # Documentation
├── .gitignore
├── .pre-commit-config.yaml
├── Makefile # Common development commands
├── pyproject.toml # Project metadata and dependencies
└── README.md
Run the full test suite:
make testRun specific test categories:
# Unit tests
make test-unit
# Integration tests
make test-integration
# With coverage report
make coverageFormat and check code style:
make format # Auto-format code
make lint # Run linters
make typecheck # Run type checking
make check-all # Run all checks-
Create a configuration file
config.yaml:version: 1.0 pipelines: - name: basic_dialog steps: - type: input name: user_input source: console - type: processor name: nlp_processor module: dialogchain.processors.nlp function: process_text - type: output name: response target: console
-
Run the dialog chain:
poetry run dialogchain -c config.yaml
- 💬 Dialog Management: Stateful conversation handling and context management
- 🤖 Multi-Language Support: Python, Go, Rust, C++, Node.js processors
- 🔌 Flexible Connectors: REST APIs, WebSockets, gRPC, MQTT, and more
- 🧠 ML/NLP Integration: Built-in support for popular NLP libraries and models
- ⚙️ Simple Configuration: YAML/JSON configuration with environment variables
- 🐳 Cloud Native: Docker, Kubernetes, and serverless deployment ready
- 📊 Production Ready: Monitoring, logging, and error handling
- 🧪 Comprehensive Testing: Unit, integration, and end-to-end tests
- 🔍 Code Quality: Type hints, linting, and code formatting
- 📈 Scalable: Horizontal scaling for high-throughput applications
This project uses:
# Run tests with coverage
poetry run pytest --cov=dialogchain --cov-report=term-missing
# Format code
poetry run black .
poetry run isort .
# Lint code
poetry run flake8
# Type checking
poetry run mypy dialogchain┌─────────────┐ ┌──────────────────┐ ┌─────────────┐
│ Inputs │ │ Processors │ │ Outputs │
├─────────────┤ ├──────────────────┤ ├─────────────┤
│ HTTP API │───►│ NLP Processing │───►│ REST API │
│ WebSocket │ │ Intent Detection │ │ WebSocket │
│ gRPC │ │ Entity Extraction│ │ gRPC │
│ CLI │ │ Dialog Management│ │ Message Bus │
│ Message Bus │ │ Response Gen │ │ Logging │
└─────────────┘ └──────────────────┘ └─────────────┘
# Clone repository
git clone https://github.com/dialogchain/python
cd python
# Install dependencies
poetry install
# Run the application
poetry run dialogchain --helpCreate your .env file:
# Copy template and edit
cp .env.example .envExample .env:
CAMERA_USER=admin
CAMERA_PASS=your_password
CAMERA_IP=192.168.1.100
SMTP_USER=alerts@company.com
SMTP_PASS=app_password
SECURITY_EMAIL=security@company.comGenerate a configuration template:
dialogchain init --template camera --output my_config.yamlExample route (simplified YAML):
routes:
- name: "smart_security_camera"
from: "rtsp://{{CAMERA_USER}}:{{CAMERA_PASS}}@{{CAMERA_IP}}/stream1"
processors:
# Python: Object detection
- type: "external"
command: "python scripts/detect_objects.py"
config:
confidence_threshold: 0.6
target_objects: ["person", "car"]
# Filter high-risk only
- type: "filter"
condition: "{{threat_level}} == 'high'"
to:
- "smtp://{{SMTP_SERVER}}:{{SMTP_PORT}}?user={{SMTP_USER}}&password={{SMTP_PASS}}&to={{SECURITY_EMAIL}}"
- "http://webhook.company.com/security-alert"Run all routes
dialogchain run -c my_config.yamlRun specific route
dialogchain run -c my_config.yaml --route smart_dialog_flowDry run to see what would execute
dialogchain run -c my_config.yaml --dry-run dialogchain run -c my_config.yaml --dry-run
🔍 DRY RUN - Configuration Analysis:
==================================================
📍 Route: front_door_camera
From: rtsp://:@/stream1
Processors:
1. external
Command: python -m ultralytics_processor
2. filter
3. transform
To:
• smtp://:?user=&password=&to=| Source | Example URL | Description |
|---|---|---|
| RTSP Camera | rtsp://user:pass@ip/stream1 |
Live video streams |
| Timer | timer://5m |
Scheduled execution |
| File | file:///path/to/watch |
File monitoring |
| gRPC | grpc://localhost:50051/Service/Method |
gRPC endpoints |
| MQTT | mqtt://broker:1883/topic |
MQTT messages |
Delegate to any programming language:
processors:
# Python ML inference
- type: "external"
command: "python scripts/detect_objects.py"
input_format: "json"
output_format: "json"
config:
model: "yolov8n.pt"
confidence_threshold: 0.6
# Go image processing
- type: "external"
command: "go run scripts/image_processor.go"
config:
thread_count: 4
optimization: "speed"
# Rust performance-critical tasks
- type: "external"
command: "cargo run --bin data_processor"
config:
batch_size: 32
simd_enabled: true
# C++ optimized algorithms
- type: "external"
command: "./bin/cpp_postprocessor"
config:
algorithm: "fast_nms"
threshold: 0.85
# Node.js business logic
- type: "external"
command: "node scripts/business_rules.js"
config:
rules_file: "security_rules.json"processors:
# Filter messages
- type: "filter"
condition: "{{confidence}} > 0.7"
# Transform output
- type: "transform"
template: "Alert: {{object_type}} detected at {{position}}"
# Aggregate over time
- type: "aggregate"
strategy: "collect"
timeout: "5m"
max_size: 100| Destination | Example URL | Description |
|---|---|---|
smtp://smtp.gmail.com:587?user={{USER}}&password={{PASS}}&to={{EMAILS}} |
SMTP alerts | |
| HTTP | http://api.company.com/webhook |
REST API calls |
| MQTT | mqtt://broker:1883/alerts/camera |
MQTT publishing |
| File | file:///logs/alerts.log |
File logging |
| gRPC | grpc://service:50051/AlertService/Send |
gRPC calls |
dialogchain/
├── dialogchain/ # Python package
│ ├── cli.py # Command line interface
│ ├── engine.py # Main routing engine
│ ├── processors.py # Processing components
│ └── connectors.py # Input/output connectors
├── scripts/ # External processors
│ ├── detect_objects.py # Python: YOLO detection
│ ├── health_check.go # Go: Health monitoring
│ └── business_rules.js # Node.js: Business logic
├── examples/ # Configuration examples
│ └── simple_routes.yaml # Sample routes
├── k8s/ # Kubernetes deployment
│ └── deployment.yaml # K8s manifests
├── Dockerfile # Container definition
├── Makefile # Build automation
└── README.md # This file
# Build all processors
make build-all
# Build specific language
make build-go
make build-rust
make build-cpp
# Install dependencies
make install-deps# Development environment
make dev
# Run tests
make test
# Lint code
make lint
# Build distribution
make build# Build image
make docker
# Run with Docker
docker run -it --rm \
-v $(PWD)/examples:/app/examples \
-v $(PWD)/.env:/app/.env \
dialogchain:latest
# Or use Make
make docker-runversion: "3.8"
services:
dialogchain:
build: .
environment:
- CAMERA_IP=192.168.1.100
- MQTT_BROKER=mqtt
volumes:
- ./examples:/app/examples
- ./logs:/app/logs
depends_on:
- mqtt
- redis
mqtt:
image: eclipse-mosquitto:2
ports:
- "1883:1883"
redis:
image: redis:7-alpine
ports:
- "6379:6379"# Deploy to Kubernetes
make deploy-k8s
# Or manually
kubectl apply -f k8s/
# Check status
kubectl get pods -n dialogchain
# View logs
kubectl logs -f deployment/dialogchain -n dialogchainFeatures in Kubernetes:
- Horizontal Pod Autoscaling: Auto-scale based on CPU/memory/custom metrics
- Resource Management: CPU/memory limits and requests
- Health Checks: Liveness and readiness probes
- Persistent Storage: Shared volumes for model files and logs
- Service Discovery: Internal service communication
- Monitoring: Prometheus metrics integration
# Health check endpoint
curl http://localhost:8080/health
# Metrics endpoint (Prometheus format)
curl http://localhost:8080/metrics
# Runtime statistics
curl http://localhost:8080/stats# View real-time logs
make logs
# Start monitoring dashboard
make monitor# Run benchmarks
make benchmark# Camera settings
CAMERA_USER=admin
CAMERA_PASS=password
CAMERA_IP=192.168.1.100
CAMERA_NAME=front_door
# Email settings
SMTP_SERVER=smtp.gmail.com
SMTP_PORT=587
SMTP_USER=alerts@company.com
SMTP_PASS=app_password
SECURITY_EMAIL=security@company.com
# Service URLs
WEBHOOK_URL=https://hooks.company.com
ML_GRPC_SERVER=localhost:50051
DASHBOARD_URL=https://dashboard.company.com
# MQTT settings
MQTT_BROKER=localhost
MQTT_PORT=1883
MQTT_USER=dialogchain
MQTT_PASS=secret
# Advanced settings
MAX_CONCURRENT_ROUTES=10
DEFAULT_TIMEOUT=30
LOG_LEVEL=info
METRICS_ENABLED=trueroutes:
- name: "route_name" # Required: Route identifier
from: "source_uri" # Required: Input source
processors: # Optional: Processing pipeline
- type: "processor_type"
config: {}
to: ["destination_uri"] # Required: Output destinations
# Global settings
settings:
max_concurrent_routes: 10
default_timeout: 30
log_level: "info"
metrics_enabled: true
health_check_port: 8080
# Required environment variables
env_vars:
- CAMERA_USER
- SMTP_PASS- Input: RTSP cameras, motion sensors
- Processing: Python (YOLO), Go (risk analysis), Node.js (rules)
- Output: Email alerts, mobile push, dashboard
- Input: Factory cameras, sensor data
- Processing: Python (defect detection), C++ (performance), Rust (safety)
- Output: MQTT control, database, operator alerts
- Input: MQTT sensors, HTTP APIs
- Processing: Go (aggregation), Python (analytics), Node.js (business logic)
- Output: Time-series DB, real-time dashboard, alerts
- Input: File uploads, streaming video
- Processing: Python (ML inference), C++ (codec), Rust (optimization)
- Output: CDN upload, metadata database, webhooks
# Test RTSP connection
ffmpeg -i rtsp://user:pass@ip/stream1 -frames:v 1 test.jpg
# Check network connectivity
ping camera_ip
telnet camera_ip 554# Test processor manually
echo '{"test": "data"}' | python scripts/detect_objects.py --input /dev/stdin
# Check dependencies
which go python node cargo
# View processor logs
dialogchain run -c config.yaml --verbose# Monitor resource usage
htop
# Check route performance
make benchmark
# Optimize configuration
# - Reduce frame processing rate
# - Increase batch sizes
# - Use async processors# Enable verbose logging
dialogchain run -c config.yaml --verbose
# Dry run to test configuration
dialogchain run -c config.yaml --dry-run
# Validate configuration
dialogchain validate -c config.yaml- Fork the repository
- Create feature branch:
git checkout -b feature/amazing-feature - Make changes and add tests
- Run checks:
make dev-workflow - Commit changes:
git commit -m 'Add amazing feature' - Push to branch:
git push origin feature/amazing-feature - Open Pull Request
# Clone and setup
git clone https://github.com/dialogchain/python
cd python
make dev
# Run tests
make test
For questions or support, please open an issue in the [issue tracker](https://github.com/taskinity/dialogchain/issues).
## 🔗 Related Projects
- **[Apache Camel](https://camel.apache.org/)**: Original enterprise integration framework
- **[GStreamer](https://gstreamer.freedesktop.org/)**: Multimedia framework
- **[Apache NiFi](https://nifi.apache.org/)**: Data flow automation
- **[Kubeflow](https://kubeflow.org/)**: ML workflows on Kubernetes
- **[TensorFlow Serving](https://tensorflow.org/tfx/serving)**: ML model serving
## 💡 Roadmap
- [ ] **Web UI**: Visual route designer and monitoring dashboard
- [ ] **More Connectors**: Database, cloud storage, message queues
- [ ] **Model Registry**: Integration with MLflow, DVC
- [ ] **Stream Processing**: Apache Kafka, Apache Pulsar support
- [ ] **Auto-scaling**: Dynamic processor scaling based on load
- [ ] **Security**: End-to-end encryption, authentication, authorization
- [ ] **Templates**: Pre-built templates for common use cases
---
**Built with ❤️ for the ML and multimedia processing community**
[⭐ Star us on GitHub](https://github.com/dialogchain/python) | [📖 Documentation](https://docs.dialogchain.org) | [💬 Community](https://discord.gg/dialogchain)