diff --git a/plugins/outputs/all/microsoft_fabric.go b/plugins/outputs/all/microsoft_fabric.go new file mode 100644 index 0000000000000..d8e1602289739 --- /dev/null +++ b/plugins/outputs/all/microsoft_fabric.go @@ -0,0 +1,5 @@ +//go:build !custom || outputs || outputs.microsoft_fabric + +package all + +import _ "github.com/influxdata/telegraf/plugins/outputs/microsoft_fabric" // register plugin diff --git a/plugins/outputs/microsoft_fabric/README.md b/plugins/outputs/microsoft_fabric/README.md new file mode 100644 index 0000000000000..692fcad2f3d54 --- /dev/null +++ b/plugins/outputs/microsoft_fabric/README.md @@ -0,0 +1,148 @@ +# Microsoft Fabric Output Plugin + +This plugin writes metrics to [Real time analytics in Fabric][fabric] services. + +⭐ Telegraf v1.35.0 +🏷️ datastore +💻 all + +[fabric]: https://learn.microsoft.com/en-us/fabric/real-time-analytics/overview + +## Global configuration options + +In addition to the plugin-specific configuration settings, plugins support +additional global and plugin configuration settings. These settings are used to +modify metrics, tags, and field or create aliases and configure ordering, etc. +See the [CONFIGURATION.md][CONFIGURATION.md] for more details. + +[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins + +## Configuration + +```toml @sample.conf +# Sends metrics to Microsoft Fabric +[[outputs.microsoft_fabric]] + ## The URI property of the resource on Azure + connection_string = "https://trd-abcd.xx.kusto.fabric.microsoft.com;Database=kusto_eh;Table Name=telegraf_dump;Key=value" + + ## Client timeout + # timeout = "30s" +``` + +### Connection String + +The `connection_string` provide information necessary for the plugin to +establish a connection to the Fabric service endpoint. It is a +semicolon-delimited list of name-value parameter pairs, optionally prefixed by +a single URI. The setting is specific to the type of endpoint you are using. +The sections below will detail on the required and available name-value pairs +for each type. + +### EventHouse + +This plugin allows you to leverage Microsoft Fabric's capabilities to store and +analyze your Telegraf metrics. Eventhouse is a high-performance, scalable +data-store designed for real-time analytics. It allows you to ingest, store and +query large volumes of data with low latency. For more information, visit the +[Eventhouse documentation][eventhousedocs]. + +The following table lists all the possible properties that can be included in a +connection string and provide alias names for each property. + +| Property name | Aliases | Description | +|---|---|---| +| Client Version for Tracing | | The property used when tracing the client version. | +| Data Source | Addr, Address, Network Address, Server | The URI specifying the Kusto service endpoint. For example, `https://mycluster.fabric.windows.net`. | +| Initial Catalog | Database | The default database name. For example, `MyDatabase`. | +| Ingestion Type | IngestionType | Values can be set to `managed` for streaming ingestion with fallback to batched ingestion or the `queued` method for queuing up metrics and process sequentially | +| Table Name | TableName | Name of the single table to store all the metrics; only needed if `metrics_grouping_type` is `singletable` | +| Create Tables | CreateTables | Creates tables and relevant mapping if `true` (default). Otherwise table and mapping creation is skipped. This is useful for running Telegraf with the lowest possible permissions i.e. table ingestor role. | +| Metrics Grouping Type | MetricsGroupingType | Type of metrics grouping used when pushing to Eventhouse either being `tablepermetric` or `singletable`. Default is "tablepermetric" for one table per different metric.| + +[eventhousedocs]: https://learn.microsoft.com/fabric/real-time-intelligence/eventhouse + +#### Metrics Grouping + +Metrics can be grouped in two ways to be sent to Azure Data Explorer. To specify +which metric grouping type the plugin should use, the respective value should be +given to the `Metrics Grouping Type` in the connection string. If no value is +given, by default, the metrics will be grouped using `tablepermetric`. + +#### TablePerMetric + +The plugin will group the metrics by the metric name and will send each group +of metrics to an Azure Data Explorer table. If the table doesn't exist the +plugin will create the table, if the table exists then the plugin will try to +merge the Telegraf metric schema to the existing table. For more information +about the merge process check the [`.create-merge` documentation][create-merge]. + +The table name will match the metric name, i.e. the name of the metric must +comply with the Azure Data Explorer table naming constraints in case you plan +to add a prefix to the metric name. + +[create-merge]: https://docs.microsoft.com/en-us/azure/data-explorer/kusto/management/create-merge-table-command + +#### SingleTable + +The plugin will send all the metrics received to a single Azure Data Explorer +table. The name of the table must be supplied via `table_name` parameter in the +`connection_string`. If the table doesn't exist the plugin will create the +table, if the table exists then the plugin will try to merge the Telegraf metric +schema to the existing table. For more information about the merge process check +the [`.create-merge` documentation][create-merge]. + +#### Tables Schema + +The schema of the Eventhouse table will match the structure of the metric. +The corresponding command generated by the plugin would be like the following: + +```kql +.create-merge table ['table-name'] (['fields']:dynamic, ['name']:string, ['tags']:dynamic, ['timestamp']:datetime) +``` + +The corresponding table mapping would be like the following: + +```kql +.create-or-alter table ['table-name'] ingestion json mapping 'table-name_mapping' '[{"column":"fields", "Properties":{"Path":"$[\'fields\']"}},{"column":"name", "Properties":{"Path":"$[\'name\']"}},{"column":"tags", "Properties":{"Path":"$[\'tags\']"}},{"column":"timestamp", "Properties":{"Path":"$[\'timestamp\']"}}]' +``` + +> [!NOTE] +> This plugin will automatically create tables and corresponding table mapping +> using the command above. + +#### Ingestion type + +> [!NOTE] +> [Streaming ingestion][streaming] has to be enabled on ADX in case of +> `managed` operation. + +Refer to the following query below to check if streaming is enabled: + +```kql +.show database policy streamingingestion +``` + +To learn more about configuration, supported authentication methods and querying +ingested data, check the [documentation][ethdocs]. + +[streaming]: https://learn.microsoft.com/en-us/azure/data-explorer/ingest-data-streaming?tabs=azure-portal%2Ccsharp +[ethdocs]: https://learn.microsoft.com/azure/data-explorer/ingest-data-telegraf + +### Eventstream + +Eventstreams allow you to bring real-time events into Fabric, transform them, +and then route them to various destinations without writing any code (no-code). +For more information, visit the [Eventstream documentation][eventstream_docs]. + +To communicate with an eventstream, you need to specify a connection string for +the namespace or the event hub. The following properties can be added to the +standard [Eventstream connection string][ecs] using key-value pairs: + +| Property name | Aliases | Description | +|---|---|---| +| Partition Key | PartitionKey | Metric tag or field name to use for the event partition key if it exists. If both, tag and field, exist the tag is takes precedence, otherwise the value `` is used | +| Max Message Size| MaxMessageSize | Maximum batch message size in bytes The allowable size depends on the Event Hub tier, see [tier information][tiers] for details. If unset the default size defined by Azure Event Hubs is used (currently 1,000,000 bytes) | + +[eventstream_docs]: https://learn.microsoft.com/fabric/real-time-intelligence/event-streams/overview?tabs=enhancedcapabilities +[ecs]: https://learn.microsoft.com/azure/event-hubs/event-hubs-get-connection-string +[tiers]: https://learn.microsoft.com/azure/event-hubs/event-hubs-quotas#basic-vs-standard-vs-premium-vs-dedicated-tiers diff --git a/plugins/outputs/microsoft_fabric/event_house.go b/plugins/outputs/microsoft_fabric/event_house.go new file mode 100644 index 0000000000000..4acb39e7406a7 --- /dev/null +++ b/plugins/outputs/microsoft_fabric/event_house.go @@ -0,0 +1,167 @@ +package microsoft_fabric + +import ( + "errors" + "fmt" + "slices" + "strings" + "time" + + "github.com/Azure/azure-kusto-go/kusto/ingest" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/plugins/common/adx" + "github.com/influxdata/telegraf/plugins/serializers/json" +) + +type eventhouse struct { + connectionString string + adx.Config + + client *adx.Client + log telegraf.Logger + serializer telegraf.Serializer +} + +func (e *eventhouse) init() error { + // Initialize defaults + e.CreateTables = true + + // Parse the connection string by splitting it into key-value pairs + // and extract the extra keys used for plugin configuration + pairs := strings.Split(e.connectionString, ";") + for _, pair := range pairs { + // Skip empty pairs + if strings.TrimSpace(pair) == "" { + continue + } + // Split each pair into key and value + k, v, found := strings.Cut(pair, "=") + if !found { + return fmt.Errorf("invalid connection string format: %s", pair) + } + + // Only lowercase the keys as the values might be case sensitive + k = strings.ToLower(strings.TrimSpace(k)) + v = strings.TrimSpace(v) + + key := strings.ReplaceAll(k, " ", "") + switch key { + case "datasource", "addr", "address", "networkaddress", "server": + e.Endpoint = v + case "initialcatalog", "database": + e.Database = v + case "ingestiontype": + e.IngestionType = v + case "tablename": + e.TableName = v + case "createtables": + switch v { + case "true": + e.CreateTables = true + case "false": + e.CreateTables = false + default: + return fmt.Errorf("invalid setting %q for %q", v, k) + } + case "metricsgroupingtype": + if v != adx.TablePerMetric && v != adx.SingleTable { + return errors.New("metrics grouping type is not valid:" + v) + } + e.MetricsGrouping = v + } + } + + // Setup the JSON serializer + serializer := &json.Serializer{ + TimestampUnits: config.Duration(time.Nanosecond), + TimestampFormat: time.RFC3339Nano, + } + if err := serializer.Init(); err != nil { + return fmt.Errorf("initializing JSON serializer failed: %w", err) + } + e.serializer = serializer + + return nil +} + +func (e *eventhouse) Connect() error { + client, err := e.NewClient("Kusto.Telegraf", e.log) + if err != nil { + return fmt.Errorf("creating new client failed: %w", err) + } + e.client = client + + return nil +} + +func (e *eventhouse) Write(metrics []telegraf.Metric) error { + if e.MetricsGrouping == adx.TablePerMetric { + return e.writeTablePerMetric(metrics) + } + return e.writeSingleTable(metrics) +} + +func (e *eventhouse) Close() error { + return e.client.Close() +} + +func (e *eventhouse) writeTablePerMetric(metrics []telegraf.Metric) error { + tableMetricGroups := make(map[string][]byte) + // Group metrics by name and serialize them + for _, m := range metrics { + tableName := m.Name() + metricInBytes, err := e.serializer.Serialize(m) + if err != nil { + return err + } + if existingBytes, ok := tableMetricGroups[tableName]; ok { + tableMetricGroups[tableName] = append(existingBytes, metricInBytes...) + } else { + tableMetricGroups[tableName] = metricInBytes + } + } + + // Push the metrics for each table + format := ingest.FileFormat(ingest.JSON) + for tableName, tableMetrics := range tableMetricGroups { + if err := e.client.PushMetrics(format, tableName, tableMetrics); err != nil { + return err + } + } + + return nil +} + +func (e *eventhouse) writeSingleTable(metrics []telegraf.Metric) error { + // serialise each metric in metrics - store in byte[] + metricsArray := make([]byte, 0) + for _, m := range metrics { + metricsInBytes, err := e.serializer.Serialize(m) + if err != nil { + return err + } + metricsArray = append(metricsArray, metricsInBytes...) + } + + // push metrics to a single table + format := ingest.FileFormat(ingest.JSON) + err := e.client.PushMetrics(format, e.TableName, metricsArray) + return err +} + +func isEventhouseEndpoint(endpoint string) bool { + prefixes := []string{ + "data source=", + "addr=", + "address=", + "network address=", + "server=", + } + + ep := strings.ToLower(endpoint) + return slices.ContainsFunc(prefixes, func(prefix string) bool { + return strings.HasPrefix(ep, prefix) + }) +} diff --git a/plugins/outputs/microsoft_fabric/event_house_test.go b/plugins/outputs/microsoft_fabric/event_house_test.go new file mode 100644 index 0000000000000..2d5f3277af58d --- /dev/null +++ b/plugins/outputs/microsoft_fabric/event_house_test.go @@ -0,0 +1,108 @@ +package microsoft_fabric + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/plugins/common/adx" + "github.com/influxdata/telegraf/testutil" +) + +func TestEventHouseConnectSuccess(t *testing.T) { + tests := []struct { + name string + endpoint string + database string + }{ + { + name: "valid configuration", + endpoint: "addr=https://example.com", + database: "testdb", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Setup plugin + plugin := &eventhouse{ + connectionString: tt.endpoint, + Config: adx.Config{ + Database: tt.database, + }, + log: testutil.Logger{}, + } + require.NoError(t, plugin.init()) + + // Check for successful connection and client creation + require.NoError(t, plugin.Connect()) + require.NotNil(t, plugin.client) + // Clean up resources + require.NoError(t, plugin.Close()) + }) + } +} + +func TestIsEventhouseEndpoint(t *testing.T) { + tests := []struct { + name string + endpoint string + }{ + { + name: "data source prefix", + endpoint: "data source=https://example.com", + }, + { + name: "address prefix", + endpoint: "address=https://example.com", + }, + { + name: "network address prefix", + endpoint: "network address=https://example.com", + }, + { + name: "server prefix", + endpoint: "server=https://example.com", + }, + { + name: "case insensitive prefix", + endpoint: "DATA SOURCE=https://example.com", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.True(t, isEventhouseEndpoint(tt.endpoint)) + }) + } +} + +func TestIsNotEventhouseEndpoint(t *testing.T) { + tests := []struct { + name string + endpoint string + }{ + { + name: "Invalid prefix", + endpoint: "invalid=https://example.com", + }, + { + name: "Empty string", + endpoint: "", + }, + { + name: "Just URL", + endpoint: "https://example.com", + }, + { + name: "eventstream endpoint", + endpoint: "Endpoint=sb://example.com", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.False(t, isEventhouseEndpoint(tt.endpoint)) + }) + } +} diff --git a/plugins/outputs/microsoft_fabric/event_stream.go b/plugins/outputs/microsoft_fabric/event_stream.go new file mode 100644 index 0000000000000..74e104f6f867d --- /dev/null +++ b/plugins/outputs/microsoft_fabric/event_stream.go @@ -0,0 +1,197 @@ +//go:generate ../../../tools/readme_config_includer/generator +package microsoft_fabric + +import ( + "context" + "errors" + "fmt" + "strconv" + "strings" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/serializers/json" +) + +type eventstream struct { + connectionString string + timeout config.Duration + log telegraf.Logger + + partitionKey string + maxMessageSize config.Size + + client *azeventhubs.ProducerClient + options azeventhubs.EventDataBatchOptions + serializer telegraf.Serializer +} + +func (e *eventstream) init() error { + // Parse the connection string by splitting it into key-value pairs + // and extract the extra keys used for plugin configuration + pairs := strings.Split(e.connectionString, ";") + for _, pair := range pairs { + // Skip empty pairs + if strings.TrimSpace(pair) == "" { + continue + } + // Split each pair into key and value + k, v, found := strings.Cut(pair, "=") + if !found { + return fmt.Errorf("invalid connection string format: %q", pair) + } + + // Only lowercase the keys as the values might be case sensitive + k = strings.ToLower(strings.TrimSpace(k)) + v = strings.TrimSpace(v) + + key := strings.ReplaceAll(k, " ", "") + switch key { + case "partitionkey": + e.partitionKey = v + case "maxmessagesize": + msgsize, err := strconv.ParseUint(v, 10, 64) + if err != nil { + return fmt.Errorf("invalid max message size: %w", err) + } + if msgsize > 0 { + e.options.MaxBytes = msgsize + } + } + } + + // Setup the JSON serializer + serializer := &json.Serializer{ + TimestampUnits: config.Duration(time.Nanosecond), + TimestampFormat: time.RFC3339Nano, + } + if err := serializer.Init(); err != nil { + return fmt.Errorf("setting up JSON serializer failed: %w", err) + } + e.serializer = serializer + + return nil +} + +func (e *eventstream) Connect() error { + cfg := &azeventhubs.ProducerClientOptions{ + ApplicationID: internal.FormatFullVersion(), + RetryOptions: azeventhubs.RetryOptions{MaxRetries: -1}, + } + + client, err := azeventhubs.NewProducerClientFromConnectionString(e.connectionString, "", cfg) + if err != nil { + return fmt.Errorf("failed to create client: %w", err) + } + e.client = client + + return nil +} + +func (e *eventstream) Close() error { + if e.client == nil { + return nil + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.timeout)) + defer cancel() + + return e.client.Close(ctx) +} + +func (e *eventstream) Write(metrics []telegraf.Metric) error { + // This context is only used for creating the batches which should not timeout as this is + // not an I/O operation. Therefore avoid setting a timeout here. + ctx := context.Background() + + // Iterate over the metrics and group them to batches + batchOptions := e.options + batches := make(map[string]*azeventhubs.EventDataBatch) + for i := 0; i < len(metrics); i++ { + m := metrics[i] + + // Prepare the payload + payload, err := e.serializer.Serialize(m) + if err != nil { + e.log.Errorf("Could not serialize metric: %v", err) + e.log.Tracef("metric: %+v", m) + continue + } + + // Get the batcher for the chosen partition + partition := "" + if e.partitionKey != "" { + if key, ok := m.GetTag(e.partitionKey); ok { + partition = key + } else if key, ok := m.GetField(e.partitionKey); ok { + if k, ok := key.(string); ok { + partition = k + } + } + } + batchOptions.PartitionKey = &partition + if _, found := batches[partition]; !found { + batches[partition], err = e.client.NewEventDataBatch(ctx, &batchOptions) + if err != nil { + return fmt.Errorf("creating batch for partition %q failed: %w", partition, err) + } + } + + // Add the event to the partition and send it if the batch is full + err = batches[partition].AddEventData(&azeventhubs.EventData{Body: payload}, nil) + if err == nil { + continue + } + + // If the event doesn't fit into the batch anymore, send the batch + if !errors.Is(err, azeventhubs.ErrEventDataTooLarge) { + return fmt.Errorf("adding metric to batch for partition %q failed: %w", partition, err) + } + + // The event is larger than the maximum allowed size so there + // is nothing we can do here but have to drop the metric. + if batches[partition].NumEvents() == 0 { + e.log.Errorf("Metric with %d bytes exceeds the maximum allowed size and must be dropped!", len(payload)) + e.log.Tracef("metric: %+v", m) + continue + } + if err := e.send(ctx, batches[partition]); err != nil { + return fmt.Errorf("sending batch for partition %q failed: %w", partition, err) + } + + // Create a new metric and reiterate over the current metric to be + // added in the next iteration of the for loop. + batches[partition], err = e.client.NewEventDataBatch(ctx, &e.options) + if err != nil { + return fmt.Errorf("creating batch for partition %q failed: %w", partition, err) + } + i-- + } + + // Send the remaining batches that never exceeded the batch size + for partition, batch := range batches { + if batch.NumBytes() == 0 { + continue + } + if err := e.send(ctx, batch); err != nil { + return fmt.Errorf("sending batch for partition %q failed: %w", partition, err) + } + } + + return nil +} + +func (e *eventstream) send(ctx context.Context, batch *azeventhubs.EventDataBatch) error { + ctx, cancel := context.WithTimeout(ctx, time.Duration(e.timeout)) + defer cancel() + + return e.client.SendEventDataBatch(ctx, batch, nil) +} + +func isEventstreamEndpoint(endpoint string) bool { + return strings.HasPrefix(strings.ToLower(endpoint), "endpoint=sb") +} diff --git a/plugins/outputs/microsoft_fabric/event_stream_test.go b/plugins/outputs/microsoft_fabric/event_stream_test.go new file mode 100644 index 0000000000000..4bfd1e6eb9180 --- /dev/null +++ b/plugins/outputs/microsoft_fabric/event_stream_test.go @@ -0,0 +1,59 @@ +package microsoft_fabric + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestIsEventstreamEndpoint(t *testing.T) { + tests := []struct { + name string + endpoint string + }{ + { + name: "endpoint prefix", + endpoint: "Endpoint=sb://example.com", + }, + { + name: "case insensitive prefix", + endpoint: "Endpoint=sb://example.com", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.True(t, isEventstreamEndpoint(tt.endpoint)) + }) + } +} + +func TestIsNotEventstreamEndpoint(t *testing.T) { + tests := []struct { + name string + endpoint string + }{ + { + name: "Invalid prefix", + endpoint: "invalid=https://example.com", + }, + { + name: "Empty string", + endpoint: "", + }, + { + name: "Just URL", + endpoint: "https://example.com", + }, + { + name: "eventhouse endpoint", + endpoint: "data source=https://example.com", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.False(t, isEventstreamEndpoint(tt.endpoint)) + }) + } +} diff --git a/plugins/outputs/microsoft_fabric/microsoft_fabric.go b/plugins/outputs/microsoft_fabric/microsoft_fabric.go new file mode 100644 index 0000000000000..ebf5b5762a074 --- /dev/null +++ b/plugins/outputs/microsoft_fabric/microsoft_fabric.go @@ -0,0 +1,94 @@ +//go:generate ../../../tools/readme_config_includer/generator +package microsoft_fabric + +import ( + _ "embed" + "errors" + "fmt" + "strings" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/plugins/common/adx" + "github.com/influxdata/telegraf/plugins/outputs" +) + +//go:embed sample.conf +var sampleConfig string + +type fabric interface { + Connect() error + Write(metrics []telegraf.Metric) error + Close() error +} + +type MicrosoftFabric struct { + ConnectionString string `toml:"connection_string"` + Timeout config.Duration `toml:"timeout"` + Log telegraf.Logger `toml:"-"` + + output fabric +} + +func (*MicrosoftFabric) SampleConfig() string { + return sampleConfig +} + +func (m *MicrosoftFabric) Init() error { + // Check input parameters + if m.ConnectionString == "" { + return errors.New("endpoint must not be empty") + } + + // Initialize the output fabric dependent on the type + switch { + case isEventstreamEndpoint(m.ConnectionString): + m.Log.Debug("Detected EventStream endpoint...") + eventstream := &eventstream{ + connectionString: m.ConnectionString, + timeout: m.Timeout, + log: m.Log, + } + if err := eventstream.init(); err != nil { + return fmt.Errorf("initializing EventStream output failed: %w", err) + } + m.output = eventstream + case isEventhouseEndpoint(strings.ToLower(m.ConnectionString)): + m.Log.Debug("Detected EventHouse endpoint...") + eventhouse := &eventhouse{ + connectionString: m.ConnectionString, + Config: adx.Config{ + Timeout: m.Timeout, + }, + log: m.Log, + } + if err := eventhouse.init(); err != nil { + return fmt.Errorf("initializing EventHouse output failed: %w", err) + } + m.output = eventhouse + default: + return errors.New("invalid connection string: unable to detect endpoint type (EventStream or EventHouse)") + } + return nil +} + +func (m *MicrosoftFabric) Close() error { + return m.output.Close() +} + +func (m *MicrosoftFabric) Connect() error { + return m.output.Connect() +} + +func (m *MicrosoftFabric) Write(metrics []telegraf.Metric) error { + return m.output.Write(metrics) +} + +func init() { + outputs.Add("microsoft_fabric", func() telegraf.Output { + return &MicrosoftFabric{ + Timeout: config.Duration(30 * time.Second), + } + }) +} diff --git a/plugins/outputs/microsoft_fabric/microsoft_fabric_test.go b/plugins/outputs/microsoft_fabric/microsoft_fabric_test.go new file mode 100644 index 0000000000000..df1849c4b5b87 --- /dev/null +++ b/plugins/outputs/microsoft_fabric/microsoft_fabric_test.go @@ -0,0 +1,249 @@ +package microsoft_fabric + +import ( + "testing" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/plugins/common/adx" + "github.com/influxdata/telegraf/testutil" +) + +func TestInitFail(t *testing.T) { + tests := []struct { + name string + connection string + expected string + }{ + { + name: "empty connection string", + expected: "endpoint must not be empty", + }, + { + name: "invalid connection string format", + connection: "invalid=format", + expected: "invalid connection string", + }, + { + name: "malformed connection string", + connection: "endpoint=;key=;", + expected: "invalid connection string", + }, + { + name: "invalid eventhouse connection string format", + connection: "data source=https://example.kusto.windows.net;invalid_param", + expected: "invalid connection string format", + }, + { + name: "invalid eventhouse metrics grouping type", + connection: "data source=https://example.com;metrics grouping type=Invalid", + expected: "metrics grouping type is not valid:Invalid", + }, + { + name: "invalid eventhouse create tables value", + connection: "data source=https://example.com;database=mydb;create tables=invalid", + expected: "invalid setting", + }, + { + name: "invalid eventstream connection format", + connection: "Endpoint=sb://namespace.servicebus.windows.net/;invalid_param", + expected: "invalid connection string format", + }, + { + name: "invalid eventstream max message size", + connection: "Endpoint=sb://namespace.servicebus.windows.net/;maxmessagesize=-4", + expected: "invalid max message size", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Setup the plugin + plugin := &MicrosoftFabric{ + ConnectionString: tt.connection, + Log: testutil.Logger{}, + } + + // Check the returned error + require.ErrorContains(t, plugin.Init(), tt.expected) + }) + } +} + +func TestInitEventHouse(t *testing.T) { + tests := []struct { + name string + connection string + timeout config.Duration + expected adx.Config + }{ + { + name: "valid connection", + connection: "data source=https://example.kusto.windows.net;Database=testdb", + expected: adx.Config{ + Endpoint: "https://example.kusto.windows.net", + Database: "testdb", + CreateTables: true, + Timeout: config.Duration(30 * time.Second), + }, + }, + { + name: "connection with timeout", + connection: "data source=https://example.kusto.windows.net;Database=testdb", + timeout: config.Duration(60 * time.Second), + expected: adx.Config{ + Endpoint: "https://example.kusto.windows.net", + Database: "testdb", + CreateTables: true, + Timeout: config.Duration(60 * time.Second), + }, + }, + { + name: "connection with all parameters", + connection: "data source=https://example.com;database=mydb;table name=mytable;create tables=true;metrics grouping type=tablepermetric", + expected: adx.Config{ + Endpoint: "https://example.com", + Database: "mydb", + TableName: "mytable", + MetricsGrouping: "tablepermetric", + CreateTables: true, + Timeout: config.Duration(30 * time.Second), + }, + }, + { + name: "case insensitive parameters", + connection: "DATA SOURCE=https://example.com;DATABASE=mydb", + expected: adx.Config{ + Endpoint: "https://example.com", + Database: "mydb", + CreateTables: true, + Timeout: config.Duration(30 * time.Second), + }, + }, + { + name: "server parameter instead of data source", + connection: "server=https://example.com;database=mydb", + expected: adx.Config{ + Endpoint: "https://example.com", + Database: "mydb", + CreateTables: true, + Timeout: config.Duration(30 * time.Second), + }, + }, + { + name: "create tables parameter true", + connection: "data source=https://example.com;database=mydb;create tables=true", + expected: adx.Config{ + Endpoint: "https://example.com", + Database: "mydb", + CreateTables: true, + Timeout: config.Duration(30 * time.Second), + }, + }, + { + name: "create tables parameter false", + connection: "data source=https://example.com;database=mydb;create tables=false", + expected: adx.Config{ + Endpoint: "https://example.com", + Database: "mydb", + CreateTables: false, + Timeout: config.Duration(30 * time.Second), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Setup the plugin + plugin := &MicrosoftFabric{ + ConnectionString: tt.connection, + Timeout: config.Duration(30 * time.Second), // default set by init() + Log: testutil.Logger{}, + } + if tt.timeout > 0 { + plugin.Timeout = tt.timeout + } + require.NoError(t, plugin.Init()) + + // Check the created plugin + require.NotNil(t, plugin.output, "active plugin should have been set") + ap, ok := plugin.output.(*eventhouse) + require.Truef(t, ok, "expected evenhouse plugin but got %T", plugin.output) + require.Equal(t, tt.expected, ap.Config) + }) + } +} + +func TestInitEventStream(t *testing.T) { + tests := []struct { + name string + connection string + timeout config.Duration + expected eventstream + }{ + { + name: "valid connection", + connection: "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=keyName;SharedAccessKey=key", + expected: eventstream{ + connectionString: "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=keyName;SharedAccessKey=key", + timeout: config.Duration(30 * time.Second), + }, + }, + { + name: "connection with timeout", + connection: "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=keyName;SharedAccessKey=key", + timeout: config.Duration(60 * time.Second), + expected: eventstream{ + connectionString: "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=keyName;SharedAccessKey=key", + timeout: config.Duration(60 * time.Second), + }, + }, + { + name: "connection with partition key and message size", + connection: "Endpoint=sb://example.com;partitionkey=mykey;maxmessagesize=1024", + expected: eventstream{ + connectionString: "Endpoint=sb://example.com;partitionkey=mykey;maxmessagesize=1024", + partitionKey: "mykey", + options: azeventhubs.EventDataBatchOptions{MaxBytes: 1024}, + timeout: config.Duration(30 * time.Second), + }, + }, { + name: "case insensitive keys", + connection: "endpoint=sb://example.com;PARTITIONKEY=mykey;MaxMessageSize=1024", + expected: eventstream{ + connectionString: "endpoint=sb://example.com;PARTITIONKEY=mykey;MaxMessageSize=1024", + partitionKey: "mykey", + options: azeventhubs.EventDataBatchOptions{MaxBytes: 1024}, + timeout: config.Duration(30 * time.Second), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Setup plugin + plugin := &MicrosoftFabric{ + ConnectionString: tt.connection, + Timeout: config.Duration(30 * time.Second), // default set by init() + Log: testutil.Logger{}, + } + if tt.timeout > 0 { + plugin.Timeout = tt.timeout + } + + require.NoError(t, plugin.Init()) + + // Check the created plugin + require.NotNil(t, plugin.output, "active plugin should have been set") + ap, ok := plugin.output.(*eventstream) + require.Truef(t, ok, "expected evenstream plugin but got %T", plugin.output) + require.Equal(t, tt.expected.connectionString, ap.connectionString) + require.Equal(t, tt.expected.timeout, ap.timeout) + require.Equal(t, tt.expected.partitionKey, ap.partitionKey) + require.Equal(t, tt.expected.maxMessageSize, ap.maxMessageSize) + }) + } +} diff --git a/plugins/outputs/microsoft_fabric/sample.conf b/plugins/outputs/microsoft_fabric/sample.conf new file mode 100644 index 0000000000000..6707bcc74c277 --- /dev/null +++ b/plugins/outputs/microsoft_fabric/sample.conf @@ -0,0 +1,7 @@ +# Sends metrics to Microsoft Fabric +[[outputs.microsoft_fabric]] + ## The URI property of the resource on Azure + connection_string = "https://trd-abcd.xx.kusto.fabric.microsoft.com;Database=kusto_eh;Table Name=telegraf_dump;Key=value" + + ## Client timeout + # timeout = "30s"