diff --git a/Cargo.lock b/Cargo.lock index 1dd31d61..c2740a2a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4009,6 +4009,7 @@ dependencies = [ "prost-build 0.13.5", "pyo3", "rand 0.8.5", + "rayon", "rdkafka", "read-restrict", "regex", diff --git a/Cargo.toml b/Cargo.toml index dd6d2875..35ed9cff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,6 +71,7 @@ bstr = "1.12.0" indexmap = "2.9.0" hmac = "0.12" sha2 = "0.10" +rayon = { version = "1.10.0", optional = true } regex = "1.11.1" rdkafka = { version = "0.38", features = [ "tokio", @@ -115,7 +116,7 @@ prost-build = "0.13.4" default = ["rdkafka", "aws_iam"] pprof = ["dep:pprof"] pyo3 = ["dep:pyo3", "dep:rotel_python_processor_sdk"] -rdkafka = ["dep:rdkafka"] +rdkafka = ["dep:rdkafka", "dep:rayon"] file_exporter = ["dep:parquet", "dep:arrow", "dep:base64"] aws_iam = ["dep:aws-config", "dep:aws-credential-types"] prometheus = ["dep:opentelemetry-prometheus-text-exporter"] diff --git a/src/init/agent.rs b/src/init/agent.rs index 0751172a..f9ccd3f2 100644 --- a/src/init/agent.rs +++ b/src/init/agent.rs @@ -906,7 +906,7 @@ impl Agent { kafka_offset_committer = kafka.take_offset_committer(); let receivers_cancel = receivers_cancel.clone(); - receivers_task_set.spawn(async move { kafka.run(receivers_cancel).await }); + receivers_task_set.spawn(kafka.run(receivers_cancel)); } } } diff --git a/src/receivers/kafka/receiver.rs b/src/receivers/kafka/receiver.rs index 669d6966..4f4982de 100644 --- a/src/receivers/kafka/receiver.rs +++ b/src/receivers/kafka/receiver.rs @@ -1,17 +1,13 @@ use crate::bounded_channel::bounded; use crate::receivers::kafka::config::{DeserializationFormat, KafkaReceiverConfig}; -use crate::receivers::kafka::error::{KafkaReceiverError, Result}; +use crate::receivers::kafka::error::KafkaReceiverError; use crate::receivers::kafka::offset_ack_committer::{KafkaOffsetCommitter, commit_offset}; use crate::receivers::kafka::offset_tracker::TopicTrackers; use crate::receivers::otlp_output::OTLPOutput; use crate::topology::payload; use crate::topology::payload::{KafkaMetadata, MessageMetadata}; -use bytes::Bytes; +use futures::FutureExt; use futures::stream::StreamExt; -use futures_util::stream::FuturesOrdered; -use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; -use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; -use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use opentelemetry_proto::tonic::logs::v1::{LogsData, ResourceLogs}; use opentelemetry_proto::tonic::metrics::v1::{MetricsData, ResourceMetrics}; use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, TracesData}; @@ -19,19 +15,14 @@ use rdkafka::Message; use rdkafka::client::ClientContext; use rdkafka::config::FromClientConfigAndContext; use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext, Rebalance, StreamConsumer}; +use rdkafka::error::KafkaResult; use std::collections::{HashMap, HashSet}; use std::error::Error; -use std::future::Future; -use std::pin::Pin; use std::sync::Arc; use tokio::select; -use tokio::task::JoinError; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; -#[rustfmt::skip] -type DecodingFuture = Pin>, JoinError>> + Send>>; - // In the future if we support arbitrary topics with non OTLP data we might replace these // with a map. const TRACES_TOPIC_ID: u8 = 0; @@ -190,19 +181,18 @@ impl ConsumerContext for KafkaConsumerContext { } } -#[allow(dead_code)] // Just to stop warning on unused metadata -enum DecodedResult { +enum DecodeType<'a> { Traces { - resources: Vec, - metadata: KafkaMetadata, + payload: payload::Message, + output: &'a OTLPOutput>, }, Metrics { - resources: Vec, - metadata: KafkaMetadata, + payload: payload::Message, + output: &'a OTLPOutput>, }, Logs { - resources: Vec, - metadata: KafkaMetadata, + payload: payload::Message, + output: &'a OTLPOutput>, }, } @@ -219,7 +209,6 @@ pub struct KafkaReceiver { pub ack_sender: crate::bounded_channel::BoundedSender, auto_commit: bool, offset_committer: Option, - decoding_futures: FuturesOrdered, } impl KafkaReceiver { @@ -229,7 +218,7 @@ impl KafkaReceiver { metrics_output: Option>>, logs_output: Option>>, finite_retry_enabled: bool, - ) -> Result { + ) -> Result { // Get the list of topics to subscribe to let topics = config.get_topics(); @@ -318,7 +307,6 @@ impl KafkaReceiver { metrics_topic, logs_topic, format: config.deserialization_format, - decoding_futures: FuturesOrdered::new(), topic_trackers, ack_sender, auto_commit: is_auto_commit, @@ -326,35 +314,11 @@ impl KafkaReceiver { }) } - fn spawn_decode( - &mut self, - data: Vec, - metadata: KafkaMetadata, - extract_resources: impl Fn(T) -> Vec + Send + 'static, - make_result: impl Fn(Vec, KafkaMetadata) -> DecodedResult + Send + 'static, - ) where - T: serde::de::DeserializeOwned + prost::Message + Default + Send + 'static, - R: Send + 'static, - { - let format = self.format; - - let f = tokio::task::spawn_blocking(move || { - match Self::decode_kafka_message::(data, format) { - Ok(req) => { - let resources = extract_resources(req); - Ok(make_result(resources, metadata)) - } - Err(e) => Err(e), - } - }); - self.decoding_futures.push_back(Box::pin(f)); - } - // Static method for decoding Kafka messages fn decode_kafka_message( - data: Vec, + data: &[u8], format: DeserializationFormat, - ) -> std::result::Result> + ) -> Result> where T: serde::de::DeserializeOwned + prost::Message + Default, { @@ -363,7 +327,7 @@ impl KafkaReceiver { debug!("Failed to decode {}", e); e })?, - DeserializationFormat::Protobuf => T::decode(Bytes::from(data)).map_err(|e| { + DeserializationFormat::Protobuf => T::decode(data).map_err(|e| { debug!("Failed to decode {}", e); e })?, @@ -371,202 +335,60 @@ impl KafkaReceiver { Ok(request) } - // Helper method to send messages with cancellation support - async fn send_with_cancellation( - output: &OTLPOutput>, - message: payload::Message, - cancel_token: &CancellationToken, - signal_type: &str, - ) -> Result<()> - where - T: Send + 'static, - { - // Use send_async which returns a future we can select against - // This avoids both cloning and spinning - proper async coordination - let send_fut = output.send_async(message); - tokio::pin!(send_fut); - - select! { - result = send_fut => { - match result { - Ok(()) => Ok(()), - Err(_e) => { - // flume::SendError means channel disconnected - warn!("Failed to send {} to pipeline: channel disconnected", signal_type); - Err(KafkaReceiverError::SendFailed { - signal_type: signal_type.to_string(), - error: "Channel disconnected".to_string(), - }) - } - } - } - _ = cancel_token.cancelled() => { - debug!("Received cancellation signal while waiting to send {}", signal_type); - Err(KafkaReceiverError::SendCancelled) - } - } - } - pub fn take_offset_committer(&mut self) -> Option { self.offset_committer.take() } pub(crate) async fn run( - &mut self, + self, receivers_cancel: CancellationToken, - ) -> std::result::Result<(), Box> { + ) -> Result<(), Box> { debug!("Starting Kafka receiver"); // The consumer will automatically start from the position defined by auto.offset.reset // which is set to "earliest" by default in the config - let subscription = self.consumer.subscription().unwrap(); + let consumer = self.consumer.clone(); + let subscription = consumer.subscription()?; debug!("Initial subscriptions: {:?}", subscription); + // Create a reusable buffer for batching messages + let mut batch = Vec::with_capacity(MAX_CONCURRENT_DECODERS); + let stream = consumer.stream(); + // Pin the stream so we can call poll_next on it + tokio::pin!(stream); + loop { + // Clear the batch from the previous iteration. We don't really need this + // as we're draining the buffer, but it's still a safe guard + batch.clear(); + + // 1. Wait for the first message (or cancellation) select! { - record = self.consumer.recv(), if self.decoding_futures.len() < MAX_CONCURRENT_DECODERS => { - match record { - Ok(m) => { - let topic = m.topic().to_string(); - let partition = m.partition(); - let offset = m.offset(); - - debug!("Message received - topic: {}, partition: {}, offset: {}", topic, partition, offset); - - match m.payload() { - None => debug!("Empty payload from Kafka"), - Some(data) => { - let data = data.to_vec(); - match topic.as_str() { - t if t == self.traces_topic => { - // Track the offset when we receive it - self.topic_trackers.track(TRACES_TOPIC_ID, partition, offset); - let metadata = KafkaMetadata::new(offset, partition, TRACES_TOPIC_ID, Some(self.ack_sender.clone())); - match self.format { - DeserializationFormat::Json => { - self.spawn_decode( - data, metadata, - |req: TracesData| req.resource_spans, - |resources, metadata| DecodedResult::Traces { resources, metadata } - ); - } - DeserializationFormat::Protobuf => { - self.spawn_decode( - data, metadata, - |req: ExportTraceServiceRequest| req.resource_spans, - |resources, metadata| DecodedResult::Traces { resources, metadata } - ); - } - } - } - t if t == self.metrics_topic => { - // Track the offset when we receive it - self.topic_trackers.track(METRICS_TOPIC_ID, partition, offset); - let metadata = KafkaMetadata::new(offset, partition, METRICS_TOPIC_ID, Some(self.ack_sender.clone())); - match self.format { - DeserializationFormat::Json => { - self.spawn_decode( - data, metadata, - |req: MetricsData| req.resource_metrics, - |resources, metadata| DecodedResult::Metrics { resources, metadata } - ); - } - DeserializationFormat::Protobuf => { - self.spawn_decode( - data, metadata, - |req: ExportMetricsServiceRequest| req.resource_metrics, - |resources, metadata| DecodedResult::Metrics { resources, metadata } - ); - } - } - } - t if t == self.logs_topic => { - // Track the offset when we receive it - self.topic_trackers.track(LOGS_TOPIC_ID, partition, offset); - let metadata = KafkaMetadata::new(offset, partition, LOGS_TOPIC_ID, Some(self.ack_sender.clone())); - match self.format { - DeserializationFormat::Json => { - self.spawn_decode( - data, metadata, - |req: LogsData| req.resource_logs, - |resources, metadata| DecodedResult::Logs { resources, metadata } - ); - } - DeserializationFormat::Protobuf => { - self.spawn_decode( - data, metadata, - |req: ExportLogsServiceRequest| req.resource_logs, - |resources, metadata| DecodedResult::Logs { resources, metadata } - ); - } - } - } - _ => { - debug!("Received data from kafka for unknown topic: {}", topic); - } - } - } - } - } - Err(e) => info!("Error reading from Kafka: {:?}", e), - } - }, - // Process completed decoding futures - decoded_result = self.decoding_futures.select_next_some(), if !self.decoding_futures.is_empty() => { - match decoded_result { - - Ok(decode_result) => { - match decode_result { - Ok(decoded) => { - match decoded { - DecodedResult::Traces { resources, metadata } => { - if let Some(ref output) = self.traces_output { - let md = match self.auto_commit { - true => None, - false => Some(MessageMetadata::kafka(metadata)), - }; - let message = payload::Message::new(md, resources); - if let Err(KafkaReceiverError::SendCancelled) = Self::send_with_cancellation(output, message, &receivers_cancel, "traces").await { - break; - } - } - } - DecodedResult::Metrics { resources, metadata } => { - if let Some(ref output) = self.metrics_output { - let md = match self.auto_commit { - true => None, - false => Some(MessageMetadata::kafka(metadata)), - }; - let message = payload::Message::new(md, resources); - if let Err(KafkaReceiverError::SendCancelled) = Self::send_with_cancellation(output, message, &receivers_cancel, "metrics").await { - break; - } - } - } - DecodedResult::Logs { resources, metadata } => { - if let Some(ref output) = self.logs_output { - let md = match self.auto_commit { - true => None, - false => Some(MessageMetadata::kafka(metadata)), - }; - let message = payload::Message::new(md, resources); - if let Err(KafkaReceiverError::SendCancelled) = Self::send_with_cancellation(output, message, &receivers_cancel, "logs").await { - break; - } - } - } - } - } - Err(e) => { - error!("Failed to decode Kafka message: {}", e); - } - } - } - Err(e) => { - warn!("Decoding task failed: {}", e); - } - } - }, + maybe_msg = stream.next() => { + let Some(msg) = maybe_msg else { + debug!("Kafka stream ended"); + break; + }; + batch.push(msg); + } + _ = receivers_cancel.cancelled() => { + debug!("Kafka receiver cancelled, shutting down"); + break; + } + } + + // 2. Greedily pull any other immediately ready messages up to the limit + while batch.len() < MAX_CONCURRENT_DECODERS { + // Check if the next message is ready without blocking + match stream.next().now_or_never() { + Some(Some(msg)) => batch.push(msg), + _ => break, + } + } + + // 3. Process the batch (or cancellation) + select! { + _ = self.process_batch(&mut batch) => {}, _ = receivers_cancel.cancelled() => { debug!("Kafka receiver cancelled, shutting down"); break; @@ -576,6 +398,180 @@ impl KafkaReceiver { Ok(()) } + + async fn process_batch(&self, batch: &mut Vec>) + where + M: Message + Send, + { + use rayon::prelude::*; + + let messages = batch + .par_drain(..) + .filter_map(|message| self.process_message(message)) + .collect::>(); + + for decoded in messages { + match decoded { + DecodeType::Traces { payload, output } => { + if output.send(payload).await.is_err() { + warn!("Failed to send traces to pipeline: channel disconnected"); + } + } + DecodeType::Metrics { payload, output } => { + if output.send(payload).await.is_err() { + warn!("Failed to send metrics to pipeline: channel disconnected"); + } + } + DecodeType::Logs { payload, output } => { + if output.send(payload).await.is_err() { + warn!("Failed to send logs to pipeline: channel disconnected"); + } + } + } + } + } + + fn process_message(&self, message: KafkaResult) -> Option> + where + M: Message, + { + let message = match message { + Ok(message) => message, + Err(e) => { + info!("Error reading from Kafka: {:?}", e); + return None; + } + }; + + let topic = message.topic(); + let partition = message.partition(); + let offset = message.offset(); + + debug!(topic, partition, offset, "Message received"); + + let Some(data) = message.payload() else { + debug!("Empty payload from Kafka"); + return None; + }; + + match topic { + t if t == self.traces_topic => self._process_message::( + data, + offset, + partition, + self.traces_output.as_ref(), + )?, + t if t == self.metrics_topic => self._process_message::( + data, + offset, + partition, + self.metrics_output.as_ref(), + )?, + t if t == self.logs_topic => self._process_message::( + data, + offset, + partition, + self.logs_output.as_ref(), + )?, + _ => { + debug!("Received data from kafka for unknown topic: {}", topic); + + None + } + } + } + + fn _process_message<'a, T>( + &self, + data: &[u8], + offset: i64, + partition: i32, + output: Option<&'a OTLPOutput>>, + ) -> Option>> + where + T: OtlpResourceProvider, + { + self.topic_trackers.track(T::TOPIC_ID, partition, offset); + + let output = output?; + + let md = (!self.auto_commit).then(|| { + MessageMetadata::kafka(KafkaMetadata { + offset, + partition, + topic_id: TracesData::TOPIC_ID, + ack_chan: Some(self.ack_sender.clone()), + }) + }); + + match Self::decode_kafka_message::(data, self.format) { + Ok(payload) => Some(Some(payload.extract(md, output))), + Err(e) => { + error!("Failed to process Kafka message: {}", e); + None + } + } + } +} + +// Handy trait for extracting OTLP resources from different signal types (Traces, Metrics, Logs). +trait OtlpResourceProvider: serde::de::DeserializeOwned + prost::Message + Default { + const TOPIC_ID: u8; + type ResourceType: Send + 'static; + + fn extract<'a>( + self, + md: Option, + output: &'a OTLPOutput>, + ) -> DecodeType<'a>; +} + +impl OtlpResourceProvider for TracesData { + const TOPIC_ID: u8 = TRACES_TOPIC_ID; + type ResourceType = ResourceSpans; + + fn extract<'a>( + self, + md: Option, + output: &'a OTLPOutput>, + ) -> DecodeType<'a> { + DecodeType::Traces { + payload: payload::Message::new(md, self.resource_spans), + output, + } + } +} + +impl OtlpResourceProvider for MetricsData { + const TOPIC_ID: u8 = METRICS_TOPIC_ID; + type ResourceType = ResourceMetrics; + + fn extract<'a>( + self, + md: Option, + output: &'a OTLPOutput>, + ) -> DecodeType<'a> { + DecodeType::Metrics { + payload: payload::Message::new(md, self.resource_metrics), + output, + } + } +} + +impl OtlpResourceProvider for LogsData { + const TOPIC_ID: u8 = LOGS_TOPIC_ID; + type ResourceType = ResourceLogs; + + fn extract<'a>( + self, + md: Option, + output: &'a OTLPOutput>, + ) -> DecodeType<'a> { + DecodeType::Logs { + payload: payload::Message::new(md, self.resource_logs), + output, + } + } } #[cfg(test)] @@ -583,6 +579,9 @@ mod tests { use super::*; use crate::bounded_channel::bounded; use crate::receivers::kafka::config::AutoOffsetReset; + use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; + use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; + use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use opentelemetry_proto::tonic::common::v1::any_value::Value; use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue}; use opentelemetry_proto::tonic::logs::v1::{LogRecord, ResourceLogs, ScopeLogs}; @@ -594,6 +593,7 @@ mod tests { use prost::Message; use rdkafka::ClientConfig; use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}; + use rdkafka::message::BorrowedHeaders; use rdkafka::producer::{FutureProducer, FutureRecord}; use rdkafka::util::Timeout; use std::time::Duration; @@ -775,6 +775,49 @@ mod tests { }] } + struct TestMessage { + payload: Vec, + topic: &'static str, + partition: i32, + offset: i64, + } + + impl rdkafka::Message for TestMessage { + type Headers = BorrowedHeaders; + + fn key(&self) -> Option<&[u8]> { + None + } + + fn payload(&self) -> Option<&[u8]> { + Some(&self.payload) + } + + unsafe fn payload_mut(&mut self) -> Option<&mut [u8]> { + Some(&mut self.payload) + } + + fn topic(&self) -> &str { + self.topic + } + + fn partition(&self) -> i32 { + self.partition + } + + fn offset(&self) -> i64 { + self.offset + } + + fn timestamp(&self) -> rdkafka::Timestamp { + rdkafka::Timestamp::NotAvailable + } + + fn headers(&self) -> Option<&Self::Headers> { + None + } + } + #[tokio::test] async fn test_kafka_receiver_creation_with_valid_config() { let config = @@ -798,7 +841,7 @@ mod tests { test_data.encode(&mut buf).expect("Failed to encode"); let result = KafkaReceiver::decode_kafka_message::( - buf, + &buf, DeserializationFormat::Protobuf, ); @@ -822,7 +865,7 @@ mod tests { test_data.encode(&mut buf).expect("Failed to encode"); let result = KafkaReceiver::decode_kafka_message::( - buf, + &buf, DeserializationFormat::Protobuf, ); @@ -849,7 +892,7 @@ mod tests { test_data.encode(&mut buf).expect("Failed to encode"); let result = KafkaReceiver::decode_kafka_message::( - buf, + &buf, DeserializationFormat::Protobuf, ); @@ -868,7 +911,7 @@ mod tests { let json_data = serde_json::to_vec(&test_data).expect("Failed to encode JSON"); let result = KafkaReceiver::decode_kafka_message::( - json_data, + &json_data, DeserializationFormat::Json, ); @@ -889,7 +932,7 @@ mod tests { let json_data = serde_json::to_vec(&test_data).expect("Failed to encode JSON"); let result = KafkaReceiver::decode_kafka_message::( - json_data, + &json_data, DeserializationFormat::Json, ); @@ -911,8 +954,10 @@ mod tests { }; let json_data = serde_json::to_vec(&test_data).expect("Failed to encode JSON"); - let result = - KafkaReceiver::decode_kafka_message::(json_data, DeserializationFormat::Json); + let result = KafkaReceiver::decode_kafka_message::( + &json_data, + DeserializationFormat::Json, + ); assert!(result.is_ok()); let decoded = result.unwrap(); @@ -939,7 +984,7 @@ mod tests { let invalid_data = b"invalid protobuf data".to_vec(); let result = KafkaReceiver::decode_kafka_message::( - invalid_data, + &invalid_data, DeserializationFormat::Protobuf, ); @@ -956,7 +1001,7 @@ mod tests { let (tx, mut rx) = bounded::>(100); let traces_output = OTLPOutput::new(tx); - let mut receiver = KafkaReceiver::new(config, Some(traces_output), None, None, false) + let receiver = KafkaReceiver::new(config, Some(traces_output), None, None, false) .expect("Failed to create receiver"); // Create test data @@ -966,57 +1011,22 @@ mod tests { let mut buf = Vec::new(); test_data.encode(&mut buf).expect("Failed to encode"); - let metadata = KafkaMetadata::new(123, 0, TRACES_TOPIC_ID, None); - - // Spawn decode task - receiver.spawn_decode( - buf, - metadata, - |req: ExportTraceServiceRequest| req.resource_spans, - |resources, metadata| DecodedResult::Traces { - resources, - metadata, - }, - ); + let message = TestMessage { + payload: buf, + topic: "test-traces", + partition: 0, + offset: 0, + }; - // Process the futures manually since we can't run the full main loop - assert_eq!(receiver.decoding_futures.len(), 1); - - // Wait for the decode task to complete and get the result - let decoded_result = receiver.decoding_futures.select_next_some().await; - assert!(decoded_result.is_ok()); - - let inner_result = decoded_result.unwrap(); - assert!(inner_result.is_ok()); - - let decoded = inner_result.unwrap(); - match decoded { - DecodedResult::Traces { - resources, - metadata: _, - } => { - assert_eq!(resources.len(), 1); - assert_eq!(resources[0].scope_spans[0].spans[0].name, "test-span"); - - // Now test the result processing by sending to pipeline - if let Some(ref output) = receiver.traces_output { - let message = payload::Message { - metadata: None, - payload: resources, - }; - output.send(message).await.expect("Failed to send"); - } + receiver.process_batch(&mut vec![Ok(message)]).await; - // Verify we received the data on the output channel - let received = rx.next().await.expect("Failed to receive traces"); - assert_eq!(received.payload.len(), 1); - assert_eq!( - received.payload[0].scope_spans[0].spans[0].name, - "test-span" - ); - } - _ => panic!("Expected traces result"), - } + // Verify we received the data on the output channel + let received = rx.next().await.expect("Failed to receive traces"); + assert_eq!(received.payload.len(), 1); + assert_eq!( + received.payload[0].scope_spans[0].spans[0].name, + "test-span" + ); } #[tokio::test] @@ -1030,7 +1040,7 @@ mod tests { let (tx, mut rx) = bounded::>(100); let metrics_output = OTLPOutput::new(tx); - let mut receiver = KafkaReceiver::new(config, None, Some(metrics_output), None, false) + let receiver = KafkaReceiver::new(config, None, Some(metrics_output), None, false) .expect("Failed to create receiver"); // Create test data @@ -1039,54 +1049,22 @@ mod tests { }; let json_data = serde_json::to_vec(&test_data).expect("Failed to encode JSON"); - let metadata = KafkaMetadata::new(456, 1, METRICS_TOPIC_ID, None); - - // Test with JSON format - receiver.spawn_decode( - json_data, - metadata, - |req: ExportMetricsServiceRequest| req.resource_metrics, - |resources, metadata| DecodedResult::Metrics { - resources, - metadata, - }, - ); + let message = TestMessage { + payload: json_data, + topic: "test-metrics", + partition: 0, + offset: 0, + }; - // Process the future - let decoded_result = receiver.decoding_futures.select_next_some().await; - assert!(decoded_result.is_ok()); - - let inner_result = decoded_result.unwrap(); - assert!(inner_result.is_ok()); - - let decoded = inner_result.unwrap(); - match decoded { - DecodedResult::Metrics { - resources, - metadata: _, - } => { - assert_eq!(resources.len(), 1); - assert_eq!(resources[0].scope_metrics[0].metrics[0].name, "test.metric"); - - // Send to pipeline - if let Some(ref output) = receiver.metrics_output { - let message = payload::Message { - metadata: None, - payload: resources, - }; - output.send(message).await.expect("Failed to send"); - } + receiver.process_batch(&mut vec![Ok(message)]).await; - // Verify received data - let received = rx.next().await.expect("Failed to receive metrics"); - assert_eq!(received.payload.len(), 1); - assert_eq!( - received.payload[0].scope_metrics[0].metrics[0].name, - "test.metric" - ); - } - _ => panic!("Expected metrics result"), - } + // Verify received data + let received = rx.next().await.expect("Failed to receive metrics"); + assert_eq!(received.payload.len(), 1); + assert_eq!( + received.payload[0].scope_metrics[0].metrics[0].name, + "test.metric" + ); } #[test] @@ -1122,7 +1100,7 @@ mod tests { let (tx, mut rx) = bounded::>(100); let traces_output = OTLPOutput::new(tx); - let mut receiver = KafkaReceiver::new(config, Some(traces_output), None, None, false) + let receiver = KafkaReceiver::new(config, Some(traces_output), None, None, false) .expect("Failed to create receiver"); let cancel_token = CancellationToken::new(); @@ -1192,7 +1170,7 @@ mod tests { let (tx, mut rx) = bounded::>(100); let metrics_output = OTLPOutput::new(tx); - let mut receiver = KafkaReceiver::new(config, None, Some(metrics_output), None, false) + let receiver = KafkaReceiver::new(config, None, Some(metrics_output), None, false) .expect("Failed to create receiver"); let cancel_token = CancellationToken::new(); @@ -1259,7 +1237,7 @@ mod tests { let (tx, mut rx) = bounded::>(100); let logs_output = OTLPOutput::new(tx); - let mut receiver = KafkaReceiver::new(config, None, None, Some(logs_output), false) + let receiver = KafkaReceiver::new(config, None, None, Some(logs_output), false) .expect("Failed to create receiver"); let cancel_token = CancellationToken::new(); @@ -1346,7 +1324,7 @@ mod tests { bounded::>(100); let logs_output = OTLPOutput::new(logs_tx); - let mut receiver = KafkaReceiver::new( + let receiver = KafkaReceiver::new( config, Some(traces_output), Some(metrics_output), @@ -1465,7 +1443,7 @@ mod tests { let (tx, _rx) = bounded::>(100); let traces_output = OTLPOutput::new(tx); - let mut receiver = KafkaReceiver::new(config, Some(traces_output), None, None, false) + let receiver = KafkaReceiver::new(config, Some(traces_output), None, None, false) .expect("Failed to create receiver"); let cancel_token = CancellationToken::new(); @@ -1516,7 +1494,7 @@ mod tests { let (tx, _rx) = bounded::>(100); let traces_output = OTLPOutput::new(tx); - let mut receiver = KafkaReceiver::new(config, Some(traces_output), None, None, false) + let receiver = KafkaReceiver::new(config, Some(traces_output), None, None, false) .expect("Failed to create receiver"); let cancel_token = CancellationToken::new(); @@ -1545,63 +1523,63 @@ mod tests { assert!(result.is_ok()); } - #[tokio::test] - async fn test_send_with_cancellation_blocked_channel() { - use crate::bounded_channel::bounded; - use crate::receivers::otlp_output::OTLPOutput; - use crate::topology::payload; - use tokio::time::Duration; - use tokio_util::sync::CancellationToken; - - // Create a channel with size 1 that we'll fill to block it - let (tx, mut rx) = bounded::>(1); - let output = OTLPOutput::new(tx); - - // Fill the channel to make it block by sending a message but not receiving it - let blocking_message = payload::Message::new(None, vec![ResourceSpans::default()]); - output - .send(blocking_message) - .await - .expect("Should be able to send first message"); - - // Now the channel is full (size 1) - any new send will block - - // Create a message to send - let message = payload::Message::new(None, vec![ResourceSpans::default()]); - - // Create cancellation token - let cancel_token = CancellationToken::new(); - let cancel_token_clone = cancel_token.clone(); - - // Clone output for the async task - let output_clone = output.clone(); - - // Spawn task that will try to send with cancellation - let send_task = tokio::spawn(async move { - KafkaReceiver::send_with_cancellation( - &output_clone, - message, - &cancel_token_clone, - "test", - ) - .await - }); - - // Give the task time to start and block on the send - tokio::time::sleep(Duration::from_millis(250)).await; - - // Cancel the operation - cancel_token.cancel(); - - // Wait for the task to complete and verify it returns SendCancelled error - let result = tokio::time::timeout(Duration::from_millis(500), send_task) - .await - .expect("Task should complete within timeout") - .expect("Task should not panic"); - - assert!(matches!(result, Err(KafkaReceiverError::SendCancelled))); - - // Clean up: drain the channel to prevent any hanging - rx.next().await; - } + // #[tokio::test] + // async fn test_send_with_cancellation_blocked_channel() { + // use crate::bounded_channel::bounded; + // use crate::receivers::otlp_output::OTLPOutput; + // use crate::topology::payload; + // use tokio::time::Duration; + // use tokio_util::sync::CancellationToken; + + // // Create a channel with size 1 that we'll fill to block it + // let (tx, mut rx) = bounded::>(1); + // let output = OTLPOutput::new(tx); + + // // Fill the channel to make it block by sending a message but not receiving it + // let blocking_message = payload::Message::new(None, vec![ResourceSpans::default()]); + // output + // .send(blocking_message) + // .await + // .expect("Should be able to send first message"); + + // // Now the channel is full (size 1) - any new send will block + + // // Create a message to send + // let message = payload::Message::new(None, vec![ResourceSpans::default()]); + + // // Create cancellation token + // let cancel_token = CancellationToken::new(); + // let cancel_token_clone = cancel_token.clone(); + + // // Clone output for the async task + // let output_clone = output.clone(); + + // // Spawn task that will try to send with cancellation + // let send_task = tokio::spawn(async move { + // KafkaReceiver::send_with_cancellation( + // &output_clone, + // message, + // &cancel_token_clone, + // "test", + // ) + // .await + // }); + + // // Give the task time to start and block on the send + // tokio::time::sleep(Duration::from_millis(250)).await; + + // // Cancel the operation + // cancel_token.cancel(); + + // // Wait for the task to complete and verify it returns SendCancelled error + // let result = tokio::time::timeout(Duration::from_millis(500), send_task) + // .await + // .expect("Task should complete within timeout") + // .expect("Task should not panic"); + + // assert!(matches!(result, Err(KafkaReceiverError::SendCancelled))); + + // // Clean up: drain the channel to prevent any hanging + // rx.next().await; + // } }