From 2a86c92dc737b569327949f8065ec5bf98720a58 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Thu, 18 Dec 2025 15:19:49 +0000 Subject: [PATCH 1/5] Panic instead of returning None when parsing failed due to unexpected message length or bad continuation marker --- crates/fluss/src/record/arrow.rs | 37 ++++++++++++++++++++++++++++---- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 806c9a58..1339003b 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -534,18 +534,19 @@ fn parse_ipc_message( const CONTINUATION_MARKER: u32 = 0xFFFFFFFF; if data.len() < 8 { - return None; + panic!("Invalid data length: {}", data.len()); } let continuation = LittleEndian::read_u32(&data[0..4]); let metadata_size = LittleEndian::read_u32(&data[4..8]) as usize; if continuation != CONTINUATION_MARKER { - return None; + panic!("Invalid continuation marker: {}", continuation); } if data.len() < 8 + metadata_size { - return None; + panic!("Invalid data length. Remaining data length {} is shorter than specified size {}", + data.len() - 8, metadata_size); } let metadata_bytes = &data[8..8 + metadata_size]; @@ -577,7 +578,7 @@ pub fn to_arrow_schema(fluss_schema: &DataType) -> SchemaRef { SchemaRef::new(arrow_schema::Schema::new(fields)) } _ => { - panic!("must be row data tyoe.") + panic!("Must be row data type.") } } } @@ -1017,4 +1018,32 @@ mod tests { fn test_timestamp_ltz_invalid_precision() { to_arrow_type(&DataTypes::timestamp_ltz_with_precision(10)); } + + #[test] + fn test_parse_ipc_message_empty() { + assert_eq!( + parse_ipc_message(&u64::to_le_bytes(0x00000000FFFFFFFF)), + None + ); + } + #[test] + #[should_panic(expected = "Invalid data length: 0")] + fn test_parse_ipc_message_invalid_data_length() { + let invalid_data = &[]; + parse_ipc_message(invalid_data); + } + + #[test] + #[should_panic(expected = "Invalid continuation marker: 1")] + fn test_parse_ipc_message_invalid_continuation_marker() { + let data_with_invalid_continuation = &u64::to_le_bytes(0x0000000000000001); + parse_ipc_message(data_with_invalid_continuation); + } + + #[test] + #[should_panic(expected = "Invalid data length. Remaining data length 0 is shorter than specified size 1")] + fn test_parse_ipc_message_data_length_shorter_than_metadata_size() { + let data_with_invalid_continuation = &u64::to_le_bytes(0x00000001FFFFFFFF); + parse_ipc_message(data_with_invalid_continuation); + } } From 8eb2f6ef7560cb0f70448feb9ed0b9d576546150 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Thu, 18 Dec 2025 15:30:57 +0000 Subject: [PATCH 2/5] Panic instead of returning None when parsing failed due to unexpected message length or bad continuation marker --- crates/fluss/src/record/arrow.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 1339003b..654fd3d9 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -523,7 +523,8 @@ impl<'a> LogRecordBatch<'a> { /// - `body_buffer`: The buffer containing the record batch body data. /// - `version`: The Arrow IPC metadata version. /// -/// Returns `None` if the data is malformed or too short. +/// # Panics +/// Panics if the data is malformed e.g. too short or bad continuation marker. fn parse_ipc_message( data: &[u8], ) -> Option<( From 6cfad21fd231997a5d99e241c553ea9a0c47f341 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Fri, 19 Dec 2025 21:06:04 +0000 Subject: [PATCH 3/5] Returning Err instead of returning None / panic when parsing failed due to unexpected message length or bad continuation marker --- crates/fluss/src/client/table/scanner.rs | 2 +- crates/fluss/src/error.rs | 2 +- crates/fluss/src/record/arrow.rs | 109 +++++++++++++---------- 3 files changed, 63 insertions(+), 50 deletions(-) diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index f6780d71..82ab9273 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -204,7 +204,7 @@ impl LogFetcher { log_scanner_status: Arc, projected_fields: Option>, ) -> Result { - let full_arrow_schema = to_arrow_schema(table_info.get_row_type()); + let full_arrow_schema = to_arrow_schema(table_info.get_row_type())?; let read_context = Self::create_read_context(full_arrow_schema, projected_fields.clone()); let tmp_dir = TempDir::with_prefix("fluss-remote-logs")?; diff --git a/crates/fluss/src/error.rs b/crates/fluss/src/error.rs index b1d5d13b..63438b19 100644 --- a/crates/fluss/src/error.rs +++ b/crates/fluss/src/error.rs @@ -39,7 +39,7 @@ pub enum Error { #[error("Row convert error")] RowConvertError(String), - #[error("arrow error")] + #[error("Arrow error: {0}")] ArrowError(#[from] ArrowError), #[error("Write error: {0}")] diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 654fd3d9..e16aa862 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -34,7 +34,7 @@ use arrow::{ writer::StreamWriter, }, }; -use arrow_schema::SchemaRef; +use arrow_schema::{SchemaRef}; use arrow_schema::{DataType as ArrowDataType, Field}; use byteorder::WriteBytesExt; use byteorder::{ByteOrder, LittleEndian}; @@ -44,6 +44,7 @@ use std::{ io::{Cursor, Write}, sync::Arc, }; +use arrow_schema::ArrowError::ParseError; /// const for record batch pub const BASE_OFFSET_LENGTH: usize = 8; @@ -165,7 +166,7 @@ pub struct RowAppendRecordBatchBuilder { impl RowAppendRecordBatchBuilder { pub fn new(row_type: &DataType) -> Self { - let schema_ref = to_arrow_schema(row_type); + let schema_ref = to_arrow_schema(row_type).unwrap(); let builders = Mutex::new( schema_ref .fields() @@ -489,19 +490,15 @@ impl<'a> LogRecordBatch<'a> { let data = &self.data[RECORDS_OFFSET..]; let record_batch = read_context.record_batch(data)?; - let log_record_iterator = match record_batch { - None => LogRecordIterator::empty(), - Some(record_batch) => { - let arrow_reader = ArrowReader::new(Arc::new(record_batch)); - LogRecordIterator::Arrow(ArrowLogRecordIterator { - reader: arrow_reader, - base_offset: self.base_log_offset(), - timestamp: self.commit_timestamp(), - row_id: 0, - change_type: ChangeType::AppendOnly, - }) - } - }; + let arrow_reader = ArrowReader::new(Arc::new(record_batch)); + let log_record_iterator = LogRecordIterator::Arrow(ArrowLogRecordIterator { + reader: arrow_reader, + base_offset: self.base_log_offset(), + timestamp: self.commit_timestamp(), + row_id: 0, + change_type: ChangeType::AppendOnly, + }); + Ok(log_record_iterator) } } @@ -518,16 +515,16 @@ impl<'a> LogRecordBatch<'a> { /// * `data` - The byte slice containing the IPC message. /// /// # Returns -/// Returns `Some((batch_metadata, body_buffer, version))` on success: +/// Returns `Ok((batch_metadata, body_buffer, version))` on success: /// - `batch_metadata`: The RecordBatch metadata from the IPC message. /// - `body_buffer`: The buffer containing the record batch body data. /// - `version`: The Arrow IPC metadata version. /// -/// # Panics -/// Panics if the data is malformed e.g. too short or bad continuation marker. +/// Returns `Err(arrow_error)` on errors +/// - `arrow_error`: Error details e.g. malformed, too short or bad continuation marker. fn parse_ipc_message( data: &[u8], -) -> Option<( +) ->Result<( arrow::ipc::RecordBatch<'_>, Buffer, arrow::ipc::MetadataVersion, @@ -535,33 +532,35 @@ fn parse_ipc_message( const CONTINUATION_MARKER: u32 = 0xFFFFFFFF; if data.len() < 8 { - panic!("Invalid data length: {}", data.len()); + Err(ParseError(format!("Invalid data length: {}", data.len())))? } let continuation = LittleEndian::read_u32(&data[0..4]); let metadata_size = LittleEndian::read_u32(&data[4..8]) as usize; if continuation != CONTINUATION_MARKER { - panic!("Invalid continuation marker: {}", continuation); + Err(ParseError(format!("Invalid continuation marker: {}", continuation)))? } if data.len() < 8 + metadata_size { - panic!("Invalid data length. Remaining data length {} is shorter than specified size {}", - data.len() - 8, metadata_size); + Err(ParseError(format!("Invalid data length. Remaining data length {} is shorter than specified size {}", + data.len() - 8, metadata_size)))? } let metadata_bytes = &data[8..8 + metadata_size]; - let message = root_as_message(metadata_bytes).ok()?; - let batch_metadata = message.header_as_record_batch()?; + let message = root_as_message(metadata_bytes) + .map_err(|err| { ParseError(err.to_string()) })?; + let batch_metadata = message.header_as_record_batch() + .ok_or(ParseError(String::from("Not a record batch")))?; let body_start = 8 + metadata_size; let body_data = &data[body_start..]; let body_buffer = Buffer::from(body_data); - Some((batch_metadata, body_buffer, message.version())) + Ok((batch_metadata, body_buffer, message.version())) } -pub fn to_arrow_schema(fluss_schema: &DataType) -> SchemaRef { +pub fn to_arrow_schema(fluss_schema: &DataType) -> Result { match &fluss_schema { DataType::Row(row_type) => { let fields: Vec = row_type @@ -576,10 +575,10 @@ pub fn to_arrow_schema(fluss_schema: &DataType) -> SchemaRef { }) .collect(); - SchemaRef::new(arrow_schema::Schema::new(fields)) + Ok(SchemaRef::new(arrow_schema::Schema::new(fields))) } _ => { - panic!("Must be row data type.") + Err(ParseError(String::from("Must be row data type.")))? } } } @@ -768,11 +767,8 @@ impl ReadContext { .map(|p| p.ordered_fields.as_slice()) } - pub fn record_batch(&self, data: &[u8]) -> Result> { - let (batch_metadata, body_buffer, version) = match parse_ipc_message(data) { - Some(result) => result, - None => return Ok(None), - }; + pub fn record_batch(&self, data: &[u8]) -> Result { + let (batch_metadata, body_buffer, version) = parse_ipc_message(data)?; // the record batch from server must be ordered by field pos, // according to project to decide what arrow schema to use @@ -809,7 +805,7 @@ impl ReadContext { } _ => record_batch, }; - Ok(Some(record_batch)) + Ok(record_batch) } } @@ -1021,30 +1017,47 @@ mod tests { } #[test] - fn test_parse_ipc_message_empty() { - assert_eq!( - parse_ipc_message(&u64::to_le_bytes(0x00000000FFFFFFFF)), - None - ); + fn test_parse_ipc_message_empty_body() { + let empty_body: &[u8] = &le_bytes(&[0xFFFFFFFF, 0x00000000]); + let result = parse_ipc_message(empty_body); + assert_eq!(result.unwrap_err().to_string(), + String::from("Arrow error: Parser error: Range [0, 4) is out of bounds.\n\n")); } + #[test] - #[should_panic(expected = "Invalid data length: 0")] fn test_parse_ipc_message_invalid_data_length() { let invalid_data = &[]; - parse_ipc_message(invalid_data); + assert_eq!(parse_ipc_message(invalid_data).unwrap_err().to_string(), + String::from("Arrow error: Parser error: Invalid data length: 0")); } #[test] - #[should_panic(expected = "Invalid continuation marker: 1")] fn test_parse_ipc_message_invalid_continuation_marker() { - let data_with_invalid_continuation = &u64::to_le_bytes(0x0000000000000001); - parse_ipc_message(data_with_invalid_continuation); + let data_with_invalid_continuation: &[u8] = &le_bytes(&[0x00000001, 0x00000000]); + assert_eq!(parse_ipc_message(data_with_invalid_continuation).unwrap_err().to_string(), + String::from("Arrow error: Parser error: Invalid continuation marker: 1")); } #[test] - #[should_panic(expected = "Invalid data length. Remaining data length 0 is shorter than specified size 1")] fn test_parse_ipc_message_data_length_shorter_than_metadata_size() { - let data_with_invalid_continuation = &u64::to_le_bytes(0x00000001FFFFFFFF); - parse_ipc_message(data_with_invalid_continuation); + let data_with_invalid_length: &[u8] = &le_bytes(&[0xFFFFFFFF, 0x00000001]); + assert_eq!(parse_ipc_message(data_with_invalid_length).unwrap_err().to_string(), + String::from("Arrow error: Parser error: Invalid data length. \ + Remaining data length 0 is shorter than specified size 1")); + } + + #[test] + fn test_parse_ipc_message_not_a_record_batch() { + let data_with_invalid_length = &le_bytes(&[0xFFFFFFFF, 0x00000004, 0x00000000]); + assert_eq!(parse_ipc_message(data_with_invalid_length).unwrap_err().to_string(), + String::from("Arrow error: Parser error: Not a record batch")); + } + + fn le_bytes(vals: &[u32]) -> Vec { + let mut out = Vec::with_capacity(vals.len() * 4); + for &v in vals { + out.extend_from_slice(&v.to_le_bytes()); + } + out } } From 1531605751c6af91154c7c55ced8aead835f1ddc Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Fri, 19 Dec 2025 21:28:13 +0000 Subject: [PATCH 4/5] Returning Err instead of returning None / panic when parsing failed due to unexpected message length or bad continuation marker --- crates/fluss/src/record/arrow.rs | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index e16aa862..87ac8bde 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -34,7 +34,7 @@ use arrow::{ writer::StreamWriter, }, }; -use arrow_schema::{SchemaRef}; +use arrow_schema::SchemaRef; use arrow_schema::{DataType as ArrowDataType, Field}; use byteorder::WriteBytesExt; use byteorder::{ByteOrder, LittleEndian}; @@ -1017,37 +1017,25 @@ mod tests { } #[test] - fn test_parse_ipc_message_empty_body() { + fn test_parse_ipc_message() { let empty_body: &[u8] = &le_bytes(&[0xFFFFFFFF, 0x00000000]); let result = parse_ipc_message(empty_body); assert_eq!(result.unwrap_err().to_string(), String::from("Arrow error: Parser error: Range [0, 4) is out of bounds.\n\n")); - } - #[test] - fn test_parse_ipc_message_invalid_data_length() { let invalid_data = &[]; assert_eq!(parse_ipc_message(invalid_data).unwrap_err().to_string(), String::from("Arrow error: Parser error: Invalid data length: 0")); - } - #[test] - fn test_parse_ipc_message_invalid_continuation_marker() { let data_with_invalid_continuation: &[u8] = &le_bytes(&[0x00000001, 0x00000000]); assert_eq!(parse_ipc_message(data_with_invalid_continuation).unwrap_err().to_string(), String::from("Arrow error: Parser error: Invalid continuation marker: 1")); - } - #[test] - fn test_parse_ipc_message_data_length_shorter_than_metadata_size() { let data_with_invalid_length: &[u8] = &le_bytes(&[0xFFFFFFFF, 0x00000001]); assert_eq!(parse_ipc_message(data_with_invalid_length).unwrap_err().to_string(), String::from("Arrow error: Parser error: Invalid data length. \ Remaining data length 0 is shorter than specified size 1")); - } - #[test] - fn test_parse_ipc_message_not_a_record_batch() { let data_with_invalid_length = &le_bytes(&[0xFFFFFFFF, 0x00000004, 0x00000000]); assert_eq!(parse_ipc_message(data_with_invalid_length).unwrap_err().to_string(), String::from("Arrow error: Parser error: Not a record batch")); From f6791c10b43ac9cf02805f0cb9fb8abe1fac3ba3 Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Sat, 20 Dec 2025 11:29:53 +0800 Subject: [PATCH 5/5] fix ci --- crates/fluss/src/client/table/scanner.rs | 2 +- crates/fluss/src/record/arrow.rs | 69 ++++++++++++++++-------- 2 files changed, 47 insertions(+), 24 deletions(-) diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 82ab9273..f6780d71 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -204,7 +204,7 @@ impl LogFetcher { log_scanner_status: Arc, projected_fields: Option>, ) -> Result { - let full_arrow_schema = to_arrow_schema(table_info.get_row_type())?; + let full_arrow_schema = to_arrow_schema(table_info.get_row_type()); let read_context = Self::create_read_context(full_arrow_schema, projected_fields.clone()); let tmp_dir = TempDir::with_prefix("fluss-remote-logs")?; diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 87ac8bde..e343e3c8 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -34,6 +34,7 @@ use arrow::{ writer::StreamWriter, }, }; +use arrow_schema::ArrowError::ParseError; use arrow_schema::SchemaRef; use arrow_schema::{DataType as ArrowDataType, Field}; use byteorder::WriteBytesExt; @@ -44,7 +45,6 @@ use std::{ io::{Cursor, Write}, sync::Arc, }; -use arrow_schema::ArrowError::ParseError; /// const for record batch pub const BASE_OFFSET_LENGTH: usize = 8; @@ -166,7 +166,7 @@ pub struct RowAppendRecordBatchBuilder { impl RowAppendRecordBatchBuilder { pub fn new(row_type: &DataType) -> Self { - let schema_ref = to_arrow_schema(row_type).unwrap(); + let schema_ref = to_arrow_schema(row_type); let builders = Mutex::new( schema_ref .fields() @@ -524,7 +524,7 @@ impl<'a> LogRecordBatch<'a> { /// - `arrow_error`: Error details e.g. malformed, too short or bad continuation marker. fn parse_ipc_message( data: &[u8], -) ->Result<( +) -> Result<( arrow::ipc::RecordBatch<'_>, Buffer, arrow::ipc::MetadataVersion, @@ -539,18 +539,23 @@ fn parse_ipc_message( let metadata_size = LittleEndian::read_u32(&data[4..8]) as usize; if continuation != CONTINUATION_MARKER { - Err(ParseError(format!("Invalid continuation marker: {}", continuation)))? + Err(ParseError(format!( + "Invalid continuation marker: {continuation}" + )))? } if data.len() < 8 + metadata_size { - Err(ParseError(format!("Invalid data length. Remaining data length {} is shorter than specified size {}", - data.len() - 8, metadata_size)))? + Err(ParseError(format!( + "Invalid data length. Remaining data length {} is shorter than specified size {}", + data.len() - 8, + metadata_size + )))? } let metadata_bytes = &data[8..8 + metadata_size]; - let message = root_as_message(metadata_bytes) - .map_err(|err| { ParseError(err.to_string()) })?; - let batch_metadata = message.header_as_record_batch() + let message = root_as_message(metadata_bytes).map_err(|err| ParseError(err.to_string()))?; + let batch_metadata = message + .header_as_record_batch() .ok_or(ParseError(String::from("Not a record batch")))?; let body_start = 8 + metadata_size; @@ -560,7 +565,7 @@ fn parse_ipc_message( Ok((batch_metadata, body_buffer, message.version())) } -pub fn to_arrow_schema(fluss_schema: &DataType) -> Result { +pub fn to_arrow_schema(fluss_schema: &DataType) -> SchemaRef { match &fluss_schema { DataType::Row(row_type) => { let fields: Vec = row_type @@ -575,10 +580,10 @@ pub fn to_arrow_schema(fluss_schema: &DataType) -> Result { }) .collect(); - Ok(SchemaRef::new(arrow_schema::Schema::new(fields))) + SchemaRef::new(arrow_schema::Schema::new(fields)) } _ => { - Err(ParseError(String::from("Must be row data type.")))? + panic!("must be row data type.") } } } @@ -1020,25 +1025,43 @@ mod tests { fn test_parse_ipc_message() { let empty_body: &[u8] = &le_bytes(&[0xFFFFFFFF, 0x00000000]); let result = parse_ipc_message(empty_body); - assert_eq!(result.unwrap_err().to_string(), - String::from("Arrow error: Parser error: Range [0, 4) is out of bounds.\n\n")); + assert_eq!( + result.unwrap_err().to_string(), + String::from("Arrow error: Parser error: Range [0, 4) is out of bounds.\n\n") + ); let invalid_data = &[]; - assert_eq!(parse_ipc_message(invalid_data).unwrap_err().to_string(), - String::from("Arrow error: Parser error: Invalid data length: 0")); + assert_eq!( + parse_ipc_message(invalid_data).unwrap_err().to_string(), + String::from("Arrow error: Parser error: Invalid data length: 0") + ); let data_with_invalid_continuation: &[u8] = &le_bytes(&[0x00000001, 0x00000000]); - assert_eq!(parse_ipc_message(data_with_invalid_continuation).unwrap_err().to_string(), - String::from("Arrow error: Parser error: Invalid continuation marker: 1")); + assert_eq!( + parse_ipc_message(data_with_invalid_continuation) + .unwrap_err() + .to_string(), + String::from("Arrow error: Parser error: Invalid continuation marker: 1") + ); let data_with_invalid_length: &[u8] = &le_bytes(&[0xFFFFFFFF, 0x00000001]); - assert_eq!(parse_ipc_message(data_with_invalid_length).unwrap_err().to_string(), - String::from("Arrow error: Parser error: Invalid data length. \ - Remaining data length 0 is shorter than specified size 1")); + assert_eq!( + parse_ipc_message(data_with_invalid_length) + .unwrap_err() + .to_string(), + String::from( + "Arrow error: Parser error: Invalid data length. \ + Remaining data length 0 is shorter than specified size 1" + ) + ); let data_with_invalid_length = &le_bytes(&[0xFFFFFFFF, 0x00000004, 0x00000000]); - assert_eq!(parse_ipc_message(data_with_invalid_length).unwrap_err().to_string(), - String::from("Arrow error: Parser error: Not a record batch")); + assert_eq!( + parse_ipc_message(data_with_invalid_length) + .unwrap_err() + .to_string(), + String::from("Arrow error: Parser error: Not a record batch") + ); } fn le_bytes(vals: &[u32]) -> Vec {