Xelians DataHub Studio is the official extension framework for building external integration workers for the Xelians DataHub platform.
It allows developers to implement custom:
- Collectors — retrieve data from external systems
- Transformers — process and consolidate data
- Senders — deliver processed data to target systems
The framework provides:
- Controlled execution lifecycle
- Shared and scalable thread pools
- Retry management
- Persistent channel state
- Metrics publication
- Structured configuration model
- Java Version: 17 (minimum) to 25 (maximum)
Add the dependency to your pom.xml:
<dependency>
<groupId>fr.xelians</groupId>
<artifactId>datahub-studio</artifactId>
<version>4.2.0</version>
</dependency>A Channel represents a complete data transfer pipeline.
Each channel contains:
-
1 Collector
-
1 Transformer
-
1 Sender
Workers are instantiated at channel startup using parameters configured through the DataHub UI.
Scheduler -> Collector Pool -> Transformer Pool -> Sender Pool
Channels are assigned to a set of shared thread pools:
-
A Collect Pool
-
A Transform Pool
-
A Send Pool
Multiple channels may share the same pool instances.
-
Single collect execution per channel at a time
-
Transform and Send tasks are queued
-
Execution model: at-least-once
-
Senders may be retried
Workers must be thread-safe if reused.
Workers must use:
TempWorkDir.dir();Location:
${java.io.tmpdir}/xdhUsed for:
-
Archive extraction
-
Intermediate file generation
-
Temporary processing
-
Workers MUST delete created files and directories.
-
This directory is periodically purged by the application.
-
It must NOT be used for persistent state.
List<String> collect(
Path targetDirectory,
String webhook,
XDHProcessLogger logger,
ChannelStore channelStore,
ChannelMetricsRepository metricsRepository
) throws Exception;-
Retrieve 1..n files
-
Write files to targetDirectory
-
Return filenames only (no absolute paths)
-
Generate unique filenames
-
Keep execution bounded
-
Avoid long blocking operations
throw new CollectException("Authentication failed", "AUTH_001");TransformResult transform(
List<String> fileNames,
Path sourceDirectory,
Path targetDirectory,
XDHProcessLogger logger,
ChannelStore channelStore,
ChannelMetricsRepository metricsRepository
) throws Exception;-
Transform n input files into 1 output file
-
Write output to targetDirectory
-
Generate unique filenames
-
Remain stateless
throw new TransformException("Invalid format", "FORMAT_001");SenderResult send(
String fileName,
Path sourceDirectory,
Path resultDirectory,
XDHProcessLogger logger,
ChannelStore channelStore,
ChannelMetricsRepository metricsRepository
) throws Exception;-
Send the file to target system
-
Optionally write acknowledgment files to resultDirectory
-
Return SenderResult
-
Be idempotent when possible
throw new SendException("Authentication failed", "AUTH_001");To retry a send operation, throw:
throw new DelayExecutionException(60);-
The send task will be retried after delay seconds
-
Can be combined with locking mechanisms to serialize transfers
-
Useful for:
-
Rate limiting
-
Remote throttling
-
Temporary external failures
-
Persistent key-value storage scoped to the channel.
Use cases:
-
Pagination cursors
-
Incremental synchronization
-
Execution flags
-
Retry state
Provides atomic metric increments.
Example:
metricsRepository.increment("files.sent", 1);Metrics are:
-
Atomic
-
Persisted
-
Exposed via supervision APIs
IMPORTANT: The XDHProcessLogger must be used imperatively for all logging operations within workers.
This logger allows to record processing information and events that are:
-
Structured — associated with the channel execution context
-
Persisted — stored in the DataHub workspace directory
-
Accessible — viewable through the supervision interface
The logger is provided as a parameter to all worker methods:
logger.info("File processing started: {}", fileName);
logger.error("Failed to process file: {}", fileName, exception);-
logger.debug()— detailed diagnostic information -
logger.info()— general informational messages -
logger.warn()— warning conditions -
logger.error()— error events
-
Do NOT use standard logging frameworks (e.g., SLF4J, Log4j) directly in workers
-
Logs written with
XDHProcessLoggerare displayed in the supervision UI -
They provide essential traceability for production troubleshooting
Each worker must define a configuration class:
Class<? extends Worker> getWorkerClass();
Label.Translation getName();
Label.Translation description();
String id();
String version();
WorkerForm.Form getForm();Requirements:
-
Unique ID
-
Versioning recommended
-
Constructor aligned with form definition
Parameters are injected via constructor.
Supported types:
-
Integer / int
-
Boolean / boolean
-
Double / double
-
Float / float
-
Long / long
-
String
-
Path
-
List < Map < String, Object > >
-
List < Integer | Boolean | Double | Float | Long | String >
Parameter names must strictly match the form.
Use:
WorkerForm.builder()Constraints:
-
Parameter name consistency
-
Type consistency
-
Constructor signature alignment
Misconfiguration prevents worker loading.
-
Channels may share thread pools.
-
Execution model is at-least-once.
-
Sender may be retried.
-
Workers should be idempotent.
-
Avoid static mutable state.
-
Keep collectors bounded and fast
-
Avoid long blocking operations
-
Clean temporary files
-
Use ChannelStore for state persistence
-
Publish meaningful metrics
-
Generate unique filenames
-
Make senders retry-safe
-
Implement idempotency for external calls
-
Package your project as a JAR
-
Place it in the /lib directory at application root
-
Restart DataHub
Workers are automatically discovered and loaded at startup.
Xelians DataHub Studio provides a robust and scalable framework for building production-grade integration workflows that are:
-
Observable
-
Retry-safe
-
Thread-safe
-
Scalable
-
Maintainable
By respecting the contracts and execution model described in this document, workers integrate seamlessly into the DataHub platform.