Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
a0aaa38
Added MS Fabric output plugin
asaharn Apr 17, 2025
868358f
Added Tests
asaharn Apr 17, 2025
fdf2fb1
test cases added
asaharn Apr 17, 2025
ca7ece3
comitted readme
asaharn Apr 21, 2025
38b9061
rename
asaharn Apr 21, 2025
360ada3
Updated as per code review comments
asaharn May 6, 2025
d7ed8e5
lint fix
asaharn May 6, 2025
c3286ea
lint fix for readme
asaharn May 6, 2025
8c60846
break lines for linting
asaharn May 6, 2025
334a46e
reducing line sizes in md
asaharn May 6, 2025
f28d50b
tests fixes
asaharn May 6, 2025
b0dc85c
formatting
asaharn May 7, 2025
9f721d7
moved fabricoutput struct
asaharn May 7, 2025
808242a
Changes to make connection string based property parsing
asaharn May 19, 2025
2deec8c
linting changes
asaharn May 20, 2025
67e8ae1
Readme linting
asaharn May 20, 2025
ef49ef6
readme linting
asaharn May 20, 2025
7266952
Merge branch 'master' of https://github.com/asaharn/telegraf into fea…
asaharn May 20, 2025
17c1ff8
*Added new test
asaharn Jun 2, 2025
48e2652
Lint changes
asaharn Jun 2, 2025
9794317
Include sample.conf in README
srebhan Jun 3, 2025
6588065
Cleanup plugin tests
srebhan Jun 3, 2025
8264909
Cleanup plugin tests
srebhan Jun 3, 2025
2750c5e
Cleanup plugin tests
srebhan Jun 3, 2025
110000b
Cleanup eventstream output
srebhan Jun 3, 2025
6d31472
Cleanup eventhouse output
srebhan Jun 3, 2025
45c01c9
Cleanup documentation
srebhan Jun 3, 2025
e8eb802
Fix batching iteration
srebhan Jun 3, 2025
6be4d7d
Added few changes as per second review cycle
asaharn Jun 12, 2025
d901340
Merge branch 'master' of https://github.com/asaharn/telegraf into fea…
asaharn Jun 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions plugins/outputs/all/microsoft_fabric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//go:build !custom || outputs || outputs.microsoft_fabric

package all

import _ "github.com/influxdata/telegraf/plugins/outputs/microsoft_fabric" // register plugin
148 changes: 148 additions & 0 deletions plugins/outputs/microsoft_fabric/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# Microsoft Fabric Output Plugin

This plugin writes metrics to [Real time analytics in Fabric][fabric] services.

⭐ Telegraf v1.35.0
🏷️ datastore
💻 all

[fabric]: https://learn.microsoft.com/en-us/fabric/real-time-analytics/overview

## Global configuration options <!-- @/docs/includes/plugin_config.md -->

In addition to the plugin-specific configuration settings, plugins support
additional global and plugin configuration settings. These settings are used to
modify metrics, tags, and field or create aliases and configure ordering, etc.
See the [CONFIGURATION.md][CONFIGURATION.md] for more details.

[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins

## Configuration

```toml @sample.conf
# Sends metrics to Microsoft Fabric
[[outputs.microsoft_fabric]]
## The URI property of the resource on Azure
connection_string = "https://trd-abcd.xx.kusto.fabric.microsoft.com;Database=kusto_eh;Table Name=telegraf_dump;Key=value"

## Client timeout
# timeout = "30s"
```

### Connection String

The `connection_string` provide information necessary for the plugin to
establish a connection to the Fabric service endpoint. It is a
semicolon-delimited list of name-value parameter pairs, optionally prefixed by
a single URI. The setting is specific to the type of endpoint you are using.
The sections below will detail on the required and available name-value pairs
for each type.

### EventHouse

This plugin allows you to leverage Microsoft Fabric's capabilities to store and
analyze your Telegraf metrics. Eventhouse is a high-performance, scalable
data-store designed for real-time analytics. It allows you to ingest, store and
query large volumes of data with low latency. For more information, visit the
[Eventhouse documentation][eventhousedocs].

The following table lists all the possible properties that can be included in a
connection string and provide alias names for each property.

| Property name | Aliases | Description |
|---|---|---|
| Client Version for Tracing | | The property used when tracing the client version. |
| Data Source | Addr, Address, Network Address, Server | The URI specifying the Kusto service endpoint. For example, `https://mycluster.fabric.windows.net`. |
| Initial Catalog | Database | The default database name. For example, `MyDatabase`. |
| Ingestion Type | IngestionType | Values can be set to `managed` for streaming ingestion with fallback to batched ingestion or the `queued` method for queuing up metrics and process sequentially |
| Table Name | TableName | Name of the single table to store all the metrics; only needed if `metrics_grouping_type` is `singletable` |
| Create Tables | CreateTables | Creates tables and relevant mapping if `true` (default). Otherwise table and mapping creation is skipped. This is useful for running Telegraf with the lowest possible permissions i.e. table ingestor role. |
| Metrics Grouping Type | MetricsGroupingType | Type of metrics grouping used when pushing to Eventhouse either being `tablepermetric` or `singletable`. Default is "tablepermetric" for one table per different metric.|

[eventhousedocs]: https://learn.microsoft.com/fabric/real-time-intelligence/eventhouse

#### Metrics Grouping

Metrics can be grouped in two ways to be sent to Azure Data Explorer. To specify
which metric grouping type the plugin should use, the respective value should be
given to the `Metrics Grouping Type` in the connection string. If no value is
given, by default, the metrics will be grouped using `tablepermetric`.

#### TablePerMetric

The plugin will group the metrics by the metric name and will send each group
of metrics to an Azure Data Explorer table. If the table doesn't exist the
plugin will create the table, if the table exists then the plugin will try to
merge the Telegraf metric schema to the existing table. For more information
about the merge process check the [`.create-merge` documentation][create-merge].

The table name will match the metric name, i.e. the name of the metric must
comply with the Azure Data Explorer table naming constraints in case you plan
to add a prefix to the metric name.

[create-merge]: https://docs.microsoft.com/en-us/azure/data-explorer/kusto/management/create-merge-table-command

#### SingleTable

The plugin will send all the metrics received to a single Azure Data Explorer
table. The name of the table must be supplied via `table_name` parameter in the
`connection_string`. If the table doesn't exist the plugin will create the
table, if the table exists then the plugin will try to merge the Telegraf metric
schema to the existing table. For more information about the merge process check
the [`.create-merge` documentation][create-merge].

#### Tables Schema

The schema of the Eventhouse table will match the structure of the metric.
The corresponding command generated by the plugin would be like the following:

```kql
.create-merge table ['table-name'] (['fields']:dynamic, ['name']:string, ['tags']:dynamic, ['timestamp']:datetime)
```

The corresponding table mapping would be like the following:

```kql
.create-or-alter table ['table-name'] ingestion json mapping 'table-name_mapping' '[{"column":"fields", "Properties":{"Path":"$[\'fields\']"}},{"column":"name", "Properties":{"Path":"$[\'name\']"}},{"column":"tags", "Properties":{"Path":"$[\'tags\']"}},{"column":"timestamp", "Properties":{"Path":"$[\'timestamp\']"}}]'
```

> [!NOTE]
> This plugin will automatically create tables and corresponding table mapping
> using the command above.

#### Ingestion type

> [!NOTE]
> [Streaming ingestion][streaming] has to be enabled on ADX in case of
> `managed` operation.

Refer to the following query below to check if streaming is enabled:

```kql
.show database <DB-Name> policy streamingingestion
```

To learn more about configuration, supported authentication methods and querying
ingested data, check the [documentation][ethdocs].

[streaming]: https://learn.microsoft.com/en-us/azure/data-explorer/ingest-data-streaming?tabs=azure-portal%2Ccsharp
[ethdocs]: https://learn.microsoft.com/azure/data-explorer/ingest-data-telegraf

### Eventstream

Eventstreams allow you to bring real-time events into Fabric, transform them,
and then route them to various destinations without writing any code (no-code).
For more information, visit the [Eventstream documentation][eventstream_docs].

To communicate with an eventstream, you need to specify a connection string for
the namespace or the event hub. The following properties can be added to the
standard [Eventstream connection string][ecs] using key-value pairs:

| Property name | Aliases | Description |
|---|---|---|
| Partition Key | PartitionKey | Metric tag or field name to use for the event partition key if it exists. If both, tag and field, exist the tag is takes precedence, otherwise the value `<default>` is used |
| Max Message Size| MaxMessageSize | Maximum batch message size in bytes The allowable size depends on the Event Hub tier, see [tier information][tiers] for details. If unset the default size defined by Azure Event Hubs is used (currently 1,000,000 bytes) |

[eventstream_docs]: https://learn.microsoft.com/fabric/real-time-intelligence/event-streams/overview?tabs=enhancedcapabilities
[ecs]: https://learn.microsoft.com/azure/event-hubs/event-hubs-get-connection-string
[tiers]: https://learn.microsoft.com/azure/event-hubs/event-hubs-quotas#basic-vs-standard-vs-premium-vs-dedicated-tiers
167 changes: 167 additions & 0 deletions plugins/outputs/microsoft_fabric/event_house.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package microsoft_fabric

import (
"errors"
"fmt"
"slices"
"strings"
"time"

"github.com/Azure/azure-kusto-go/kusto/ingest"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/common/adx"
"github.com/influxdata/telegraf/plugins/serializers/json"
)

type eventhouse struct {
connectionString string
adx.Config

client *adx.Client
log telegraf.Logger
serializer telegraf.Serializer
}

func (e *eventhouse) init() error {
// Initialize defaults
e.CreateTables = true

// Parse the connection string by splitting it into key-value pairs
// and extract the extra keys used for plugin configuration
pairs := strings.Split(e.connectionString, ";")
for _, pair := range pairs {
// Skip empty pairs
if strings.TrimSpace(pair) == "" {
continue
}
// Split each pair into key and value
k, v, found := strings.Cut(pair, "=")
if !found {
return fmt.Errorf("invalid connection string format: %s", pair)
}

// Only lowercase the keys as the values might be case sensitive
k = strings.ToLower(strings.TrimSpace(k))
v = strings.TrimSpace(v)

key := strings.ReplaceAll(k, " ", "")
switch key {
case "datasource", "addr", "address", "networkaddress", "server":
e.Endpoint = v
case "initialcatalog", "database":
e.Database = v
case "ingestiontype":
e.IngestionType = v
case "tablename":
e.TableName = v
case "createtables":
switch v {
case "true":
e.CreateTables = true
case "false":
e.CreateTables = false
default:
return fmt.Errorf("invalid setting %q for %q", v, k)
}
case "metricsgroupingtype":
if v != adx.TablePerMetric && v != adx.SingleTable {
return errors.New("metrics grouping type is not valid:" + v)
}
e.MetricsGrouping = v
}
}

// Setup the JSON serializer
serializer := &json.Serializer{
TimestampUnits: config.Duration(time.Nanosecond),
TimestampFormat: time.RFC3339Nano,
}
if err := serializer.Init(); err != nil {
return fmt.Errorf("initializing JSON serializer failed: %w", err)
}
e.serializer = serializer

return nil
}

func (e *eventhouse) Connect() error {
client, err := e.NewClient("Kusto.Telegraf", e.log)
if err != nil {
return fmt.Errorf("creating new client failed: %w", err)
}
e.client = client

return nil
}

func (e *eventhouse) Write(metrics []telegraf.Metric) error {
if e.MetricsGrouping == adx.TablePerMetric {
return e.writeTablePerMetric(metrics)
}
return e.writeSingleTable(metrics)
}

func (e *eventhouse) Close() error {
return e.client.Close()
}

func (e *eventhouse) writeTablePerMetric(metrics []telegraf.Metric) error {
tableMetricGroups := make(map[string][]byte)
// Group metrics by name and serialize them
for _, m := range metrics {
tableName := m.Name()
metricInBytes, err := e.serializer.Serialize(m)
if err != nil {
return err
}
if existingBytes, ok := tableMetricGroups[tableName]; ok {
tableMetricGroups[tableName] = append(existingBytes, metricInBytes...)
} else {
tableMetricGroups[tableName] = metricInBytes
}
}

// Push the metrics for each table
format := ingest.FileFormat(ingest.JSON)
for tableName, tableMetrics := range tableMetricGroups {
if err := e.client.PushMetrics(format, tableName, tableMetrics); err != nil {
return err
}
}

return nil
}

func (e *eventhouse) writeSingleTable(metrics []telegraf.Metric) error {
// serialise each metric in metrics - store in byte[]
metricsArray := make([]byte, 0)
for _, m := range metrics {
metricsInBytes, err := e.serializer.Serialize(m)
if err != nil {
return err
}
metricsArray = append(metricsArray, metricsInBytes...)
}

// push metrics to a single table
format := ingest.FileFormat(ingest.JSON)
err := e.client.PushMetrics(format, e.TableName, metricsArray)
return err
}

func isEventhouseEndpoint(endpoint string) bool {
prefixes := []string{
"data source=",
"addr=",
"address=",
"network address=",
"server=",
}

ep := strings.ToLower(endpoint)
return slices.ContainsFunc(prefixes, func(prefix string) bool {
return strings.HasPrefix(ep, prefix)
})
}
Loading
Loading