diff --git a/fff-bench/src/bench_data.rs b/fff-bench/src/bench_data.rs index 3bdc8e7..8135db8 100644 --- a/fff-bench/src/bench_data.rs +++ b/fff-bench/src/bench_data.rs @@ -994,6 +994,7 @@ impl BenchmarkDatasets { let fff = std::fs::OpenOptions::new() .write(true) .create(true) + .truncate(true) .open(output_path) .unwrap(); write_fff(&batches, &fff, options.clone()).unwrap(); @@ -1029,7 +1030,7 @@ impl BenchmarkDatasets { if PRINT_CR { error!( "FFF CR: {:.2}", - fff_size as f64 / get_arrow_size(self, &output_fname) as f64 + fff_size as f64 / get_arrow_size(self, output_fname) as f64 ); } } @@ -1240,9 +1241,14 @@ impl BenchmarkDataset for BenchmarkDatasets { BenchmarkDatasets::CFB(_) => { write_csv_as_parquet(f, output_path, ",", "", opt.rg_size, false) } - BenchmarkDatasets::PBI(_) => { - write_csv_as_parquet(f, output_path, "|", "null", opt.rg_size, opt.is_dict_scope) - } + BenchmarkDatasets::PBI(_) => write_csv_as_parquet( + f, + output_path, + "|", + "null", + opt.rg_size, + opt.is_dict_scope, + ), BenchmarkDatasets::LAION(_) => { unreachable!() } @@ -1519,8 +1525,10 @@ impl BenchmarkDataset for BenchmarkDatasets { } async fn write_as_lance_v2_1(&self) { - let mut opts = LanceFileWriterOptions::default(); - opts.format_version = Some(LanceFileVersion::V2_1); + let opts = lance_file::v2::writer::FileWriterOptions { + format_version: Some(LanceFileVersion::V2_1), + ..Default::default() + }; self._write_as_lance(opts).await } diff --git a/fff-bench/src/lib.rs b/fff-bench/src/lib.rs index 0564e47..13d4d26 100644 --- a/fff-bench/src/lib.rs +++ b/fff-bench/src/lib.rs @@ -95,7 +95,7 @@ pub fn write_fff(batches: &[RecordBatch], fff: &File, options: FileWriterOptions let mut memory_usage_sum = 0; let mut cnt = 0; for batch in batches { - fff_writer.write_batch(&batch).unwrap(); + fff_writer.write_batch(batch).unwrap(); if fff_writer.memory_size() != 0 { memory_usage_sum += fff_writer.memory_size(); cnt += 1; @@ -122,10 +122,11 @@ pub fn read_fff(pathbuf: PathBuf, opt: ReadFFFOpt) -> Result> { object_store::aws::AmazonS3Builder::from_env() .with_url(path) .with_retry({ - let mut res = object_store::RetryConfig::default(); - res.retry_timeout = std::time::Duration::from_secs(10); - res.max_retries = 1; - res + object_store::RetryConfig { + retry_timeout: std::time::Duration::from_secs(10), + max_retries: 1, + ..Default::default() + } }) .build() .unwrap(), @@ -134,7 +135,7 @@ pub fn read_fff(pathbuf: PathBuf, opt: ReadFFFOpt) -> Result> { // count the number of slashes let slash_count = path.chars().filter(|&c| c == '/').count(); assert!(slash_count == 3, "only support s3://bucket/file_name"); - let file_name = path.split('/').last().unwrap(); + let file_name = path.split('/').next_back().unwrap(); let f1 = fff_poc::io::reader::ObjectStoreReadAt::new( bucket.clone(), object_store::path::Path::from(file_name).into(), @@ -172,7 +173,7 @@ pub fn read_fff_aot_wasm( pub fn write_parquet(batches: &[RecordBatch], parquet: &File) -> Result<()> { let mut parquet_writer = ArrowWriter::try_new(parquet, batches[0].schema(), None).unwrap(); for batch in batches { - parquet_writer.write(&batch).unwrap(); + parquet_writer.write(batch).unwrap(); } parquet_writer.close().unwrap(); Ok(()) @@ -234,7 +235,7 @@ pub async fn read_lance( full_path: bool, ) -> Result { let object_store = Arc::new({ - let res = if path.starts_with("s3://") { + if path.starts_with("s3://") { let (store, _) = lance_io::object_store::ObjectStore::from_uri(path) .await .unwrap(); @@ -243,8 +244,7 @@ pub async fn read_lance( let mut res = lance_io::object_store::ObjectStore::local(); res.set_io_parallelism(1); res - }; - res + } }); let path = if full_path { if path.starts_with("s3://") { @@ -318,6 +318,7 @@ pub fn write_orc(batches: &[RecordBatch], path: &str) -> Result<()> { let file = std::fs::OpenOptions::new() .write(true) .create(true) + .truncate(true) .open(path)?; let mut writer = orc_rust::ArrowWriterBuilder::new(file, batches[0].schema()) .try_build() @@ -401,7 +402,7 @@ pub fn parquet_decompress_from( let builder = if let Some(projections) = projections { let file_metadata = builder.metadata().file_metadata().clone(); let mask = - ProjectionMask::leaves(file_metadata.schema_descr(), projections.iter().map(|&x| x)); + ProjectionMask::leaves(file_metadata.schema_descr(), projections.iter().copied()); builder.with_projection(mask) } else { builder @@ -440,7 +441,7 @@ pub async fn parquet_decompress_from_async( let builder = if let Some(projections) = projections { let file_metadata = builder.metadata().file_metadata().clone(); let mask = - ProjectionMask::leaves(file_metadata.schema_descr(), projections.iter().map(|&x| x)); + ProjectionMask::leaves(file_metadata.schema_descr(), projections.iter().copied()); builder.with_projection(mask) } else { builder diff --git a/fff-core/src/lib.rs b/fff-core/src/lib.rs index d46eef3..edceae8 100644 --- a/fff-core/src/lib.rs +++ b/fff-core/src/lib.rs @@ -1,3 +1,3 @@ pub mod errors; +pub mod macros; pub mod util; -pub mod macros; \ No newline at end of file diff --git a/fff-core/src/util/bit_util.rs b/fff-core/src/util/bit_util.rs index 1175256..33f0b95 100644 --- a/fff-core/src/util/bit_util.rs +++ b/fff-core/src/util/bit_util.rs @@ -11,4 +11,4 @@ pub fn ceil(value: T, divisor: T) -> T { #[inline] pub fn padding_size(size: usize, alignment: usize) -> usize { size.next_multiple_of(alignment) - size -} \ No newline at end of file +} diff --git a/fff-core/src/util/buffer_to_array.rs b/fff-core/src/util/buffer_to_array.rs index 41e2a6f..02be66a 100644 --- a/fff-core/src/util/buffer_to_array.rs +++ b/fff-core/src/util/buffer_to_array.rs @@ -345,7 +345,7 @@ pub fn primitive_array_from_arrow_buffers_iter( let null_buffer = arrow_buffer_to_validity(null_buffer, num_rows); let data_buffer = buffer_iter.next().unwrap(); - let data_buffer = Buffer::from(data_buffer); + let data_buffer = data_buffer; let data_buffer = BooleanBuffer::new(data_buffer, 0, num_rows as usize); Ok(Arc::new(BooleanArray::new(data_buffer, null_buffer))) @@ -734,12 +734,12 @@ pub fn primitive_array_from_buffers( DataType::List(child) => Ok(new_list_offsets_validity::( buffers, num_rows, - Arc::clone(&child), + Arc::clone(child), )), DataType::LargeList(child) => Ok(new_list_offsets_validity::( buffers, num_rows, - Arc::clone(&child), + Arc::clone(child), )), _ => Err(Error::IO( format!( diff --git a/fff-encoding/src/data_buffer.rs b/fff-encoding/src/data_buffer.rs index 810b7f2..0238040 100644 --- a/fff-encoding/src/data_buffer.rs +++ b/fff-encoding/src/data_buffer.rs @@ -27,7 +27,7 @@ pub(crate) struct DataBuffersIter<'a> { index: usize, } -impl<'a> Iterator for DataBuffersIter<'a> { +impl Iterator for DataBuffersIter<'_> { type Item = Bytes; fn next(&mut self) -> Option { diff --git a/fff-encoding/src/enc_unit.rs b/fff-encoding/src/enc_unit.rs index 7551e21..29d7d4d 100644 --- a/fff-encoding/src/enc_unit.rs +++ b/fff-encoding/src/enc_unit.rs @@ -143,17 +143,17 @@ impl FlatEncUnit { self.encoding_tree.serialize(&mut s).unwrap(); let encoding_tree = s.take_buffer(); write.write_u32::(encoding_tree.len() as u32)?; - write.write(&encoding_tree)?; + write.write_all(&encoding_tree)?; let written_len = 4 + self.buffer_sizes.as_ref().unwrap().len() * 4 + 4 + encoding_tree.len(); - write.write(&ZEROS[..padding_size(written_len, ALIGNMENT)])?; + write.write_all(&ZEROS[..padding_size(written_len, ALIGNMENT)])?; for buf in self.buffers.iter() { for buffer in buf.iter() { write.write_all(buffer.as_ref())?; } } let written_len = write.stream_position()? - start; - write.write(&ZEROS[..padding_size(written_len as usize, ALIGNMENT)])?; + write.write_all(&ZEROS[..padding_size(written_len as usize, ALIGNMENT)])?; Ok(write) } @@ -185,7 +185,7 @@ impl FlatEncUnit { Ok(Self { encoding_tree, buffer_sizes: None, - buffers: buffers.into_iter().map(|b| DataBuffer::Dense(b)).collect(), + buffers: buffers.into_iter().map(DataBuffer::Dense).collect(), }) } diff --git a/fff-encoding/src/lib.rs b/fff-encoding/src/lib.rs index 505c9b2..2c52a1f 100644 --- a/fff-encoding/src/lib.rs +++ b/fff-encoding/src/lib.rs @@ -1,3 +1,3 @@ mod data_buffer; +pub mod enc_unit; pub mod schemes; -pub mod enc_unit; \ No newline at end of file diff --git a/fff-encoding/src/schemes/bp.rs b/fff-encoding/src/schemes/bp.rs index 2002085..29dc9dc 100644 --- a/fff-encoding/src/schemes/bp.rs +++ b/fff-encoding/src/schemes/bp.rs @@ -1,6 +1,5 @@ /// Deprecated after using Vortex. /// Leave it here only for legacy usage. - use std::mem; use crate::enc_unit::ALIGNMENT; diff --git a/fff-encoding/src/schemes/vortex.rs b/fff-encoding/src/schemes/vortex.rs index 9f1f33a..8fd4288 100644 --- a/fff-encoding/src/schemes/vortex.rs +++ b/fff-encoding/src/schemes/vortex.rs @@ -108,10 +108,7 @@ impl VortexEncoder { } fn regular_encode(&self, arr: ArrayRef) -> Result { - debug_assert!(match arr.data_type() { - non_nest_types!() => true, - _ => false, - }); + debug_assert!(matches!(arr.data_type(), non_nest_types!())); Ok(EncUnit::new( self.encode_arr(arr)?, Encoding::Vortex, @@ -121,10 +118,10 @@ impl VortexEncoder { /// For List type, we only encode the validity and offsets, children are handled by logical encoders. fn list_encode(&self, arr: ArrayRef) -> Result { - debug_assert!(match arr.data_type() { - DataType::List(_) | DataType::LargeList(_) => true, - _ => false, - }); + debug_assert!(matches!( + arr.data_type(), + DataType::List(_) | DataType::LargeList(_) + )); let (validity, offsets) = match arr.data_type() { DataType::List(_) => { let list_array = arr @@ -177,10 +174,10 @@ impl VortexEncoder { } pub fn list_struct_encode(&self, list_arr: ArrayRef, field: ArrayRef) -> Result { - debug_assert!(match list_arr.data_type() { - DataType::List(_) | DataType::LargeList(_) => true, - _ => false, - }); + debug_assert!(matches!( + list_arr.data_type(), + DataType::List(_) | DataType::LargeList(_) + )); let (validity, offsets) = match list_arr.data_type() { DataType::List(_) => { let list_array = list_arr @@ -302,7 +299,7 @@ impl VortexDecoderBuilder { } pub fn with_ppd(mut self, ppd: VtxPPD) -> Self { - assert!(self.partial_decode == false); + assert!(!self.partial_decode); self.ppd = Some(ppd); self } @@ -419,7 +416,7 @@ impl Decoder for VortexDecoder { /// FIXME: decode_a_vector should be redesigned to be indexing based. Or just like slice. fn decode_a_vector(&mut self) -> Result>> { Ok(self.vortex_array.as_ref().map(|arr| { - let arr = slice(&arr, 0, 1024).expect("Slice failed"); + let arr = slice(arr, 0, 1024).expect("Slice failed"); let mut res = Vec::new(); let arrow_array = vortex_array_to_arrow(arr); if let Some(b) = arrow_array.nulls() { diff --git a/fff-format/build.rs b/fff-format/build.rs index 60f36a4..25bf4ec 100644 --- a/fff-format/build.rs +++ b/fff-format/build.rs @@ -10,16 +10,13 @@ fn main() -> Result<()> { ); // compile flatc if it does not exist - if flatc.check().is_err() { - if !Command::new("sh") + if flatc.check().is_err() && !Command::new("sh") .current_dir(Path::new(&env::var("CARGO_MANIFEST_DIR").unwrap()).join("..")) - .args(&["scripts/install_flatc.sh"]) + .args(["scripts/install_flatc.sh"]) .status() .with_context(|| "install_flatc.sh failed")? - .success() - { - anyhow::bail!("install_flatc.sh failed"); - } + .success() { + anyhow::bail!("install_flatc.sh failed"); } flatc @@ -45,7 +42,7 @@ fn main() -> Result<()> { .iter() .map(|p| p.as_path()) .collect::>(), - out_dir: Path::new(env::var("OUT_DIR").unwrap().as_str()).as_ref(), + out_dir: Path::new(env::var("OUT_DIR").unwrap().as_str()), extra: &["--filename-suffix", ""], ..Default::default() })?; diff --git a/fff-poc/src/context.rs b/fff-poc/src/context.rs index c341d59..2927137 100644 --- a/fff-poc/src/context.rs +++ b/fff-poc/src/context.rs @@ -107,14 +107,13 @@ impl WASMWritingContext { } pub fn data_type_to_wasm_id(&self, dt: &DataType) -> Option { - self.data_type_to_wasm_id.get(dt).map(|&x| x) + self.data_type_to_wasm_id.get(dt).copied() } pub fn data_type_to_wasm_lib(&self, dt: &DataType) -> Option { self.data_type_to_wasm_id .get(dt) - .map(|x| self.wasms.get(x).cloned()) - .flatten() + .and_then(|x| self.wasms.get(x).cloned()) } pub fn always_set_custom_wasm_for_built_in(&self) -> bool { diff --git a/fff-poc/src/decoder/encunit.rs b/fff-poc/src/decoder/encunit.rs index 823b029..db4ce6b 100644 --- a/fff-poc/src/decoder/encunit.rs +++ b/fff-poc/src/decoder/encunit.rs @@ -102,7 +102,7 @@ impl<'a> WASMEncUnitDecoder<'a> { } } -impl<'a> EncUnitDecoder for WASMEncUnitDecoder<'a> { +impl EncUnitDecoder for WASMEncUnitDecoder<'_> { fn decode(&self) -> Result { match &self.output_type { non_nest_types!() => { @@ -142,21 +142,13 @@ impl EncUnitDecoder for VortexEncUnitDecoder { vortex_decoder.decode_all_as_array()? } DataType::List(ref child) | DataType::LargeList(ref child) - if match child.data_type() { - DataType::Struct(fields) - if fields - .iter() - .map(|f| match f.data_type() { - non_nest_types!() => true, - _ => false, - }) - .fold(true, |acc, x| acc && x) - && cfg!(feature = "list-offsets-pushdown") => - { - true - } - _ => false, - } => + if matches!(child.data_type(), + DataType::Struct(fields) + if fields + .iter() + .all(|f| matches!(f.data_type(), non_nest_types!())) + && cfg!(feature = "list-offsets-pushdown") + ) => { let mut vortex_decoder = VortexListStructDecoder::try_new( bytes, @@ -199,21 +191,12 @@ impl EncUnitDecoder for VortexEncUnitDecoder { vortex_decoder.slice(start, stop)? } DataType::List(ref child) | DataType::LargeList(ref child) - if match child.data_type() { + if matches!(child.data_type(), DataType::Struct(fields) if fields .iter() - .map(|f| match f.data_type() { - non_nest_types!() => true, - _ => false, - }) - .fold(true, |acc, x| acc && x) - && cfg!(feature = "list-offsets-pushdown") => - { - true - } - _ => false, - } => + .all(|f| matches!(f.data_type(), non_nest_types!())) + && cfg!(feature = "list-offsets-pushdown")) => { let mut vortex_decoder = VortexListStructDecoder::try_new( bytes, diff --git a/fff-poc/src/decoder/logical.rs b/fff-poc/src/decoder/logical.rs index d8e02ba..7eadf54 100644 --- a/fff-poc/src/decoder/logical.rs +++ b/fff-poc/src/decoder/logical.rs @@ -66,7 +66,7 @@ impl PrimitiveColDecoder<'_, R> { )) })?; let computed_checksum = { - let mut checksum = create_checksum(&checksum_type); + let mut checksum = create_checksum(checksum_type); checksum.update(&buf); checksum.finalize() }; @@ -96,9 +96,7 @@ impl LogicalColDecoder for PrimitiveColDecoder<'_, R> { chunk_meta.encoding_as_shared_dictionary(), &self.primitive_type, encoded_chunk_buf, - self.wasm_context - .as_ref() - .map(|wasm_context| Arc::clone(&wasm_context)), + self.wasm_context.as_ref().map(Arc::clone), Some(self.shared_dictionary_cache), )?); while let Some(array) = self.chunk_decoder.as_mut().unwrap().decode_batch()? { @@ -143,9 +141,7 @@ impl LogicalColDecoder for PrimitiveColDecoder<'_, R> { chunk_meta.encoding_as_shared_dictionary(), &self.primitive_type, encoded_chunk_buf, - self.wasm_context - .as_ref() - .map(|wasm_context| Arc::clone(&wasm_context)), + self.wasm_context.as_ref().map(Arc::clone), Some(self.shared_dictionary_cache), )?); let mut decoded = 0; @@ -188,7 +184,7 @@ impl LogicalColDecoder for ListColDecoder<'_, R> { let arr = v_o.as_list::(); let offsets: ScalarBuffer = arr.to_data().buffers()[0].clone().into(); let offsets = OffsetBuffer::new(offsets); - let nulls = v_o.as_list::().nulls().map(|x| x.clone()); + let nulls = v_o.as_list::().nulls().cloned(); res.push(Arc::new(ListArray::new( // always return byte view array because of the change of underlining vortex. field_to_view(child.clone()), @@ -201,7 +197,7 @@ impl LogicalColDecoder for ListColDecoder<'_, R> { let offsets: ScalarBuffer = v_o.as_list::().to_data().buffers()[0].clone().into(); let offsets = OffsetBuffer::new(offsets); - let nulls = v_o.as_list::().nulls().map(|x| x.clone()); + let nulls = v_o.as_list::().nulls().cloned(); res.push(Arc::new(LargeListArray::new( field_to_view(child.clone()), offsets, @@ -327,7 +323,7 @@ impl LogicalListStructNonNestedColDecoder for ListStructColDecoder<'_ offsets[0] as usize, (offsets[len] - offsets[0]) as usize, )?; - let nulls = v_o.as_list::().nulls().map(|x| x.clone()); + let nulls = v_o.as_list::().nulls().cloned(); res.push(Arc::new(ListArray::new( Field::new_struct( child.name(), @@ -397,7 +393,7 @@ impl LogicalColDecoder for StructColDecoder<'_, R> { let res: Vec = validity .into_iter() - .zip(children.into_iter()) + .zip(children) .map(|(v, cs)| { // recover NullBuffer from BooleanArray let bool_array = v.as_boolean(); @@ -446,11 +442,7 @@ pub fn create_list_struct_decoder<'a, R: Reader>( DataType::Struct(fields) if fields .iter() - .map(|f| match f.data_type() { - non_nest_types!() => true, - _ => false, - }) - .fold(true, |acc, x| acc && x) => + .all(|f| matches!(f.data_type(), non_nest_types!())) => { if cfg!(feature = "list-offsets-pushdown") { // create offset-pushdown list struct decoder @@ -484,9 +476,7 @@ pub fn create_list_struct_decoder<'a, R: Reader>( _ => unreachable!(), } }, - wasm_context: wasm_context - .as_ref() - .map(|wasm_context| Arc::clone(&wasm_context)), + wasm_context: wasm_context.as_ref().map(Arc::clone), shared_dictionary_cache, checksum_type: None, }); @@ -514,9 +504,7 @@ pub fn create_list_struct_decoder<'a, R: Reader>( chunk_decoder: None, chunks_meta_iter, primitive_type: field.data_type().clone(), - wasm_context: wasm_context - .as_ref() - .map(|wasm_context| Arc::clone(&wasm_context)), + wasm_context: wasm_context.as_ref().map(Arc::clone), shared_dictionary_cache, checksum_type: None, }, @@ -537,9 +525,7 @@ pub fn create_list_struct_decoder<'a, R: Reader>( chunks_meta_iter }, primitive_type: DataType::Boolean, - wasm_context: wasm_context - .as_ref() - .map(|wasm_context| Arc::clone(&wasm_context)), + wasm_context: wasm_context.as_ref().map(Arc::clone), shared_dictionary_cache, checksum_type: None, }, @@ -564,9 +550,7 @@ pub fn create_list_struct_decoder<'a, R: Reader>( chunks_meta_iter }, primitive_type: f.data_type().clone(), - wasm_context: wasm_context - .as_ref() - .map(|wasm_context| Arc::clone(&wasm_context)), + wasm_context: wasm_context.as_ref().map(Arc::clone), shared_dictionary_cache, checksum_type: None, }) @@ -647,9 +631,7 @@ pub fn create_logical_decoder<'a, R: Reader>( chunks_meta_iter, // CAUTION: here we create a list primitive decoder but only output validity and offsets. primitive_type: field.data_type().clone(), - wasm_context: wasm_context - .as_ref() - .map(|wasm_context| Arc::clone(&wasm_context)), + wasm_context: wasm_context.as_ref().map(Arc::clone), shared_dictionary_cache, checksum_type, }, @@ -672,9 +654,7 @@ pub fn create_logical_decoder<'a, R: Reader>( chunk_decoder: None, chunks_meta_iter, primitive_type: DataType::Boolean, - wasm_context: wasm_context - .as_ref() - .map(|wasm_context| Arc::clone(&wasm_context)), + wasm_context: wasm_context.as_ref().map(Arc::clone), shared_dictionary_cache, checksum_type, }, @@ -686,9 +666,7 @@ pub fn create_logical_decoder<'a, R: Reader>( Arc::clone(f), column_metas, column_idx, - wasm_context - .as_ref() - .map(|wasm_context| Arc::clone(&wasm_context)), + wasm_context.as_ref().map(Arc::clone), shared_dictionary_cache, checksum_type, ) diff --git a/fff-poc/src/decoder/physical.rs b/fff-poc/src/decoder/physical.rs index bd205fc..8a7880d 100644 --- a/fff-poc/src/decoder/physical.rs +++ b/fff-poc/src/decoder/physical.rs @@ -51,7 +51,7 @@ impl<'a, R: Reader> NoDictColDecoder<'a, R> { } } -impl<'a, R: Reader> ChunkDecoder for NoDictColDecoder<'a, R> { +impl ChunkDecoder for NoDictColDecoder<'_, R> { fn decode_batch(&mut self) -> Result> { let encunit = self.encunit_iter.next(); if encunit.is_none() { @@ -70,9 +70,9 @@ impl<'a, R: Reader> ChunkDecoder for NoDictColDecoder<'a, R> { self.data_type.clone(), self.wasm_context .as_ref() - .map(|wasm_context| Arc::clone(wasm_context)), + .map(Arc::clone), )?; - decoder.decode().map(|array| Some(array)) + decoder.decode().map(Some) } fn decode_row_at(&mut self, row_id_in_chunk: usize, len: usize) -> Result> { @@ -91,7 +91,7 @@ impl<'a, R: Reader> ChunkDecoder for NoDictColDecoder<'a, R> { if cur >= row_id_in_chunk { let idx = row_id_in_chunk - last_cur; let to_decode = std::cmp::min(remaining, enc_unit_num_rows - idx); - remaining = remaining - to_decode; + remaining -= to_decode; let data = self .encoded_chunk_buf .split_to(encblock_fb.size_() as usize); @@ -103,7 +103,7 @@ impl<'a, R: Reader> ChunkDecoder for NoDictColDecoder<'a, R> { self.data_type.clone(), self.wasm_context .as_ref() - .map(|wasm_context| Arc::clone(wasm_context)), + .map(Arc::clone), )?; // Return the array with only one element at the given index. let array = match decoder.slice(idx, idx + to_decode) { @@ -269,7 +269,7 @@ macro_rules! dict_index_to_data { }}; } -impl<'a, R: Reader> ChunkDecoder for DictColDecoder<'a, R> { +impl ChunkDecoder for DictColDecoder<'_, R> { fn decode_batch(&mut self) -> Result> { let dict_encunit = self.encunit_iter.next(); if dict_encunit.is_none() { @@ -288,7 +288,7 @@ impl<'a, R: Reader> ChunkDecoder for DictColDecoder<'a, R> { self.data_type.clone(), self.wasm_context .as_ref() - .map(|wasm_context| Arc::clone(wasm_context)), + .map(Arc::clone), )?; let dict = if dict_encblock_fb.num_rows() > 0 { dict_decoder.decode()? @@ -312,7 +312,7 @@ impl<'a, R: Reader> ChunkDecoder for DictColDecoder<'a, R> { DataType::Int64, self.wasm_context .as_ref() - .map(|wasm_context| Arc::clone(wasm_context)), + .map(Arc::clone), )?; let indices_ref = indices_decoder.decode()?; let indices = indices_ref.as_any().downcast_ref::().ok_or( @@ -415,7 +415,7 @@ impl<'a, R: Reader> SharedDictColDecoder<'a, R> { } } -impl<'a, R: Reader> ChunkDecoder for SharedDictColDecoder<'a, R> { +impl ChunkDecoder for SharedDictColDecoder<'_, R> { fn decode_batch(&mut self) -> Result> { let index_encunit = self.encunit_iter.next(); if index_encunit.is_none() { @@ -434,7 +434,7 @@ impl<'a, R: Reader> ChunkDecoder for SharedDictColDecoder<'a, R> { DataType::Int64, self.wasm_context .as_ref() - .map(|wasm_context| Arc::clone(wasm_context)), + .map(Arc::clone), )?; let indices = indices_decoder.decode()?; let dict = &self.shared_dictionary; diff --git a/fff-poc/src/dict.rs b/fff-poc/src/dict.rs index eb6faae..b5eb5b1 100644 --- a/fff-poc/src/dict.rs +++ b/fff-poc/src/dict.rs @@ -324,7 +324,8 @@ impl Dictionary { "Array type mismatch in submit values".to_string(), ))?; // TODO: unnecessary copy of strings - Ok(typed_dict.submit_values(arr.into_iter().map(|x| x.map(|y| y.to_owned())))) + typed_dict.submit_values(arr.into_iter().map(|x| x.map(|y| y.to_owned()))); + Ok(()) } DataType::LargeUtf8 => { let typed_dict = self @@ -340,7 +341,8 @@ impl Dictionary { "Array type mismatch in submit values".to_string(), ))?; // TODO: unnecessary copy of strings - Ok(typed_dict.submit_values(arr.into_iter().map(|x| x.map(|y| y.to_owned())))) + typed_dict.submit_values(arr.into_iter().map(|x| x.map(|y| y.to_owned()))); + Ok(()) } DataType::Timestamp(TimeUnit::Second, _) => { typed_dict_submit_values!(self, arr, TimestampSecondArray, i64, i64) @@ -558,14 +560,12 @@ impl, U: Ord + std::hash::Hash + Clone> TypedDict { &self.indices[slice_begin_idx..] } pub fn submit_values(&mut self, arr: impl Iterator>) { - for val in arr { - if let Some(val) = val { - let hash_val = T::dict_hash(&val); - if self.key_to_ind.get(&hash_val).is_none() { - let ind = self.dictionary.len(); - self.key_to_ind.insert(hash_val, ind as u64); - self.dictionary.push(val.clone()); - } + for val in arr.flatten() { + let hash_val = T::dict_hash(&val); + if let std::collections::hash_map::Entry::Vacant(e) = self.key_to_ind.entry(hash_val) { + let ind = self.dictionary.len(); + e.insert(ind as u64); + self.dictionary.push(val.clone()); } } } @@ -574,10 +574,10 @@ impl, U: Ord + std::hash::Hash + Clone> TypedDict { T: 'a, { for val in arr { - let hash_val = T::dict_hash(&val); - if self.key_to_ind.get(&hash_val).is_none() { + let hash_val = T::dict_hash(val); + if let std::collections::hash_map::Entry::Vacant(e) = self.key_to_ind.entry(hash_val) { let ind = self.dictionary.len(); - self.key_to_ind.insert(hash_val, ind as u64); + e.insert(ind as u64); self.dictionary.push(val.clone()); } } @@ -586,7 +586,7 @@ impl, U: Ord + std::hash::Hash + Clone> TypedDict { let mut common_values = vec![]; for val in &other.dictionary { let hash_val = T::dict_hash(val); - if let Some(_) = self.key_to_ind.get(&hash_val) { + if self.key_to_ind.contains_key(&hash_val) { common_values.push(val.clone()); } } diff --git a/fff-poc/src/dict/bottom_k_sketch.rs b/fff-poc/src/dict/bottom_k_sketch.rs index 29cd205..43c398b 100644 --- a/fff-poc/src/dict/bottom_k_sketch.rs +++ b/fff-poc/src/dict/bottom_k_sketch.rs @@ -9,12 +9,12 @@ const K: usize = 2048; const M: usize = 3; // Total complexity is O(M(n log K + c^2 K)), where n is #elements, c is #columns -lazy_static!( +lazy_static! { static ref COEFFS: Vec<(u64, u64)> = { let mut rng = rand::thread_rng(); (0..M).map(|_| (rng.gen(), rng.gen())).collect() }; -); +} pub struct BottomKSketch { /// A heap to store the bottom K hash values @@ -75,14 +75,14 @@ impl BottomKSketch { let self_val = self.bottom_k_arr[i][self_bottom_pos]; let other_val = other.bottom_k_arr[i][other_bottom_pos]; collected += 1; - if self_val == other_val { - common += 1; - self_bottom_pos += 1; - other_bottom_pos += 1; - } else if self_val < other_val { - self_bottom_pos += 1; - } else { - other_bottom_pos += 1; + match self_val.cmp(&other_val) { + std::cmp::Ordering::Less => self_bottom_pos += 1, + std::cmp::Ordering::Equal => { + common += 1; + self_bottom_pos += 1; + other_bottom_pos += 1; + } + std::cmp::Ordering::Greater => other_bottom_pos += 1, } } jaccard += (common as f64) / (collected as f64); diff --git a/fff-poc/src/dict/shared_dictionary_cache.rs b/fff-poc/src/dict/shared_dictionary_cache.rs index e48b32c..3d8471c 100644 --- a/fff-poc/src/dict/shared_dictionary_cache.rs +++ b/fff-poc/src/dict/shared_dictionary_cache.rs @@ -79,7 +79,7 @@ impl SharedDictionaryCache { encoded_chunk_buf, wasm_context .as_ref() - .map(|wasm_context| Arc::clone(&wasm_context)), + .map(Arc::clone), None, )?; let mut arrays = vec![]; diff --git a/fff-poc/src/dict/shared_dictionary_context.rs b/fff-poc/src/dict/shared_dictionary_context.rs index ab5085d..47ab930 100644 --- a/fff-poc/src/dict/shared_dictionary_context.rs +++ b/fff-poc/src/dict/shared_dictionary_context.rs @@ -169,6 +169,7 @@ impl SharedDictionaryContext { self.dictionaries[dict_idx as usize].submit_values(values) } + #[allow(clippy::type_complexity)] pub fn finish_and_flush( &mut self, wasm_context: Arc, diff --git a/fff-poc/src/encoder/logical.rs b/fff-poc/src/encoder/logical.rs index 387769b..a1fb40f 100644 --- a/fff-poc/src/encoder/logical.rs +++ b/fff-poc/src/encoder/logical.rs @@ -86,7 +86,7 @@ impl LogicalColEncoder for FlatColEncoder { for data_chunk in self.data_encoder.encode(array, counter, shared_dict_ctx)? { res.push(data_chunk.update_column_index(self.column_index)); } - Ok((!res.is_empty()).then(|| res)) + Ok((!res.is_empty()).then_some(res)) } fn memory_size(&self) -> usize { @@ -102,7 +102,7 @@ impl LogicalColEncoder for FlatColEncoder { for data_chunk in self.data_encoder.finish(counter, shared_dict_ctx)? { res.push(data_chunk.update_column_index(self.column_index)); } - Ok((!res.is_empty()).then(|| res)) + Ok((!res.is_empty()).then_some(res)) } fn submit_dict(&mut self, shared_dict_ctx: &mut SharedDictionaryContext) -> Result<()> { @@ -139,7 +139,7 @@ impl LogicalColEncoder for ListColEncoder { { res.extend(values_chunks); } - Ok((!res.is_empty()).then(|| res)) + Ok((!res.is_empty()).then_some(res)) } fn memory_size(&self) -> usize { @@ -158,7 +158,7 @@ impl LogicalColEncoder for ListColEncoder { if let Some(values_chunks) = self.values_encoder.finish(counter, shared_dict_ctx)? { res.extend(values_chunks); } - Ok((!res.is_empty()).then(|| res)) + Ok((!res.is_empty()).then_some(res)) } fn submit_dict(&mut self, shared_dict_ctx: &mut SharedDictionaryContext) -> Result<()> { @@ -212,7 +212,7 @@ impl LogicalColEncoder for ListOfStructOfPrimitiveColEncoder { } _ => panic!("Expecting List or LargeList data type"), } - Ok((!res.is_empty()).then(|| res)) + Ok((!res.is_empty()).then_some(res)) } fn memory_size(&self) -> usize { @@ -233,7 +233,7 @@ impl LogicalColEncoder for ListOfStructOfPrimitiveColEncoder { res.push(field_chunk.update_column_index(*col_idx)); } } - Ok((!res.is_empty()).then(|| res)) + Ok((!res.is_empty()).then_some(res)) } fn submit_dict(&mut self, _shared_dict_ctx: &mut SharedDictionaryContext) -> Result<()> { @@ -273,7 +273,7 @@ impl LogicalColEncoder for StructColEncoder { res.extend(field_chunks); } } - Ok(res.is_empty().not().then(|| res)) + Ok(res.is_empty().not().then_some(res)) } fn memory_size(&self) -> usize { @@ -299,7 +299,7 @@ impl LogicalColEncoder for StructColEncoder { res.extend(field_chunks); } } - Ok((!res.is_empty()).then(|| res)) + Ok((!res.is_empty()).then_some(res)) } fn submit_dict(&mut self, shared_dict_ctx: &mut SharedDictionaryContext) -> Result<()> { @@ -311,6 +311,7 @@ impl LogicalColEncoder for StructColEncoder { } } +#[allow(clippy::only_used_in_recursion)] pub fn create_logical_encoder( field: FieldRef, field_id: i32, @@ -324,7 +325,7 @@ pub fn create_logical_encoder( non_nest_types!() => Ok(( Box::new(FlatColEncoder { data_encoder: create_physical_encoder( - &field.data_type(), + field.data_type(), max_chunk_size, field.is_nullable(), wasm_context, @@ -342,11 +343,7 @@ pub fn create_logical_encoder( DataType::Struct(fields) if fields .iter() - .map(|f| match f.data_type() { - non_nest_types!() => true, - _ => false, - }) - .fold(true, |acc, mk| acc && mk) + .all(|f| matches!(f.data_type(), non_nest_types!())) && cfg!(feature = "list-offsets-pushdown") => { Ok(( @@ -372,7 +369,7 @@ pub fn create_logical_encoder( // Validity and Offsets in List are encode together, and a physical encoder is created for them. let offsets_validity_index = column_idx.next_column_index(); let offsets_encoder = create_physical_encoder( - &field.data_type(), + field.data_type(), max_chunk_size, field.is_nullable(), wasm_context.clone(), @@ -469,12 +466,12 @@ fn _extract_offsets_and_validity(list_arr: &dyn Array) -> ArrayRef { match list_arr.data_type() { DataType::List(_) => { let offsets = list_arr.as_list::().offsets().clone(); - let nulls = list_arr.nulls().map(|x| x.clone()); + let nulls = list_arr.nulls().cloned(); Arc::new(Int32Array::new(offsets.into_inner(), nulls)) } DataType::LargeList(_) => { let offsets = list_arr.as_list::().offsets().clone(); - let nulls = list_arr.nulls().map(|x| x.clone()); + let nulls = list_arr.nulls().cloned(); Arc::new(Int64Array::new(offsets.into_inner(), nulls)) } _ => panic!(), @@ -500,6 +497,7 @@ fn extract_validity(list_arr: &dyn Array) -> ArrayRef { mod tests { #[test] + #[allow(clippy::arc_with_non_send_sync)] fn test_list_encoder() { use arrow::array::{Int32Builder, ListBuilder}; use arrow_array::RecordBatch; diff --git a/fff-poc/src/encoder/physical.rs b/fff-poc/src/encoder/physical.rs index 1cc5085..ce7f7e0 100644 --- a/fff-poc/src/encoder/physical.rs +++ b/fff-poc/src/encoder/physical.rs @@ -121,7 +121,7 @@ impl ListOfStructColEncoder { }, self.compression_type, )); - self.accumulated_chunk.num_rows += list_len as usize; + self.accumulated_chunk.num_rows += list_len; if self.accumulated_size > self.column_chunk_size { let chunk = std::mem::take(&mut self.accumulated_chunk); self.accumulated_size = 0; @@ -214,7 +214,7 @@ impl PhysicalColEncoder for EncoderDictColEncoder { }, self.compression_type, )); - self.accumulated_chunk.num_rows += array.len() as usize; + self.accumulated_chunk.num_rows += array.len(); if self.accumulated_size > self.column_chunk_size { let chunk = std::mem::take(&mut self.accumulated_chunk); self.accumulated_size = 0; @@ -538,11 +538,8 @@ impl SharedDictColEncoder { self.buffered_array_mem_size = 0; let dict_idx = match self.submitted_dict_idx { Some(idx) => idx, - None => { - let idx = shared_dict_ctx - .new_dictionary(buffered_arrs.first().unwrap().data_type().clone())?; - idx - } + None => shared_dict_ctx + .new_dictionary(buffered_arrs.first().unwrap().data_type().clone())?, }; let indices_arrs = buffered_arrs .into_iter() @@ -583,14 +580,14 @@ impl SharedDictColEncoder { if self.wasm_context.always_set_custom_wasm_for_built_in() { self.wasm_context.builtin_wasm_id() } else { - self.wasm_context.data_type_to_wasm_id(&arr.data_type()) + self.wasm_context.data_type_to_wasm_id(arr.data_type()) } .map(|id| WASMEncoding::new(id.0, Vec::new())), )? }, self.compression_type, )); - accumulated_chunk.num_rows += arr.len() as usize; + accumulated_chunk.num_rows += arr.len(); if accumulated_size > self.column_chunk_size { res.push(accumulated_chunk); accumulated_chunk = EncodedColumnChunk::builder() @@ -616,7 +613,7 @@ impl PhysicalColEncoder for SharedDictColEncoder { if self .buffered_arrays .first() - .map_or(false, |buf_arr| *buf_arr.data_type() != *array.data_type()) + .is_some_and(|buf_arr| *buf_arr.data_type() != *array.data_type()) { Err(fff_core::errors::Error::General( "Datatypes of arrays do not match".to_owned(), @@ -787,14 +784,14 @@ impl GLBestEncoder { if self.wasm_context.always_set_custom_wasm_for_built_in() { self.wasm_context.builtin_wasm_id() } else { - self.wasm_context.data_type_to_wasm_id(&arr.data_type()) + self.wasm_context.data_type_to_wasm_id(arr.data_type()) } .map(|id| WASMEncoding::new(id.0, Vec::new())), )? }, self.compression_type, )); - accumulated_chunk.num_rows += arr.len() as usize; + accumulated_chunk.num_rows += arr.len(); // Only split to multiple chunks for indices if dict_idx.is_some() && accumulated_size > self.column_chunk_size { res.push(accumulated_chunk); @@ -821,7 +818,7 @@ impl PhysicalColEncoder for GLBestEncoder { if self .buffered_arrays .first() - .map_or(false, |buf_arr| *buf_arr.data_type() != *array.data_type()) + .is_some_and(|buf_arr| *buf_arr.data_type() != *array.data_type()) { Err(fff_core::errors::Error::General( "Datatypes of arrays do not match".to_owned(), @@ -960,7 +957,7 @@ impl PhysicalColEncoder for GLBestEncoder { } } -/// A specific encoder containing the encoding of both validity and offsets for a List Array. +// /// A specific encoder containing the encoding of both validity and offsets for a List Array. // pub struct ListOffsetEncoder { // accumulated_chunk: EncodedColumnChunk, // accumulated_size: u64, @@ -1002,7 +999,7 @@ impl PhysicalColEncoder for GLBestEncoder { // } // } -/// Should only be useful for Struct fields +// /// Should only be useful for Struct fields // pub struct ValidityEncoder { // accumulated_chunk: EncodedColumnChunk, // accumulated_size: u64, @@ -1128,7 +1125,7 @@ mod tests { .encode(a.clone(), &mut counter, &mut shared_dict_ctx) .unwrap(); let res = encoder.finish(&mut counter, &mut shared_dict_ctx).unwrap(); - assert_eq!(res.is_empty(), false); + assert!(!res.is_empty()); let res = res.first().unwrap(); let encunit = res.encunits[0].clone(); assert_eq!( @@ -1173,8 +1170,8 @@ mod tests { .encode(a.clone(), &mut counter, &mut shared_dict_ctx) .unwrap(); let res = encoder.finish(&mut counter, &mut shared_dict_ctx).unwrap(); - assert_eq!(_res.is_empty(), true); - assert_eq!(res.is_empty(), false); + assert!(_res.is_empty()); + assert!(!res.is_empty()); let res = res.first().unwrap(); let dict_encunit = res.encunits[0].clone(); assert_eq!( diff --git a/fff-poc/src/file/footer.rs b/fff-poc/src/file/footer.rs index f4d2067..e1678de 100644 --- a/fff-poc/src/file/footer.rs +++ b/fff-poc/src/file/footer.rs @@ -45,30 +45,6 @@ pub struct PostScript { pub minor_version: u16, } -impl PostScript { - pub fn new( - metadata_size: u32, - footer_size: u32, - compression: fb::CompressionType, - checksum_type: ChecksumType, - data_checksum: u64, - schema_checksum: u64, - major_version: u16, - minor_version: u16, - ) -> Self { - Self { - metadata_size, - footer_size, - compression, - checksum_type, - data_checksum, - schema_checksum, - major_version, - minor_version, - } - } -} - /// Maps an encoding type to its semantic version #[derive(Clone, Debug)] pub struct EncodingVersion { @@ -118,8 +94,9 @@ pub(crate) fn create_default_encoding_versions() -> Result> Ok(encoding_versions) } -#[derive(Clone)] +#[derive(Clone, Default)] pub(crate) enum DictionaryEncoding { + #[default] NoDictionary, /// Stores EncBlockIndex of dictionary chunks Dictionary(Vec), @@ -127,12 +104,6 @@ pub(crate) enum DictionaryEncoding { SharedDictionary(u32), } -impl Default for DictionaryEncoding { - fn default() -> Self { - DictionaryEncoding::NoDictionary - } -} - /// ColumnMetadata (direct) for writer to use. /// Reader should use [fb::ColumnMetadata](fff_format::File::fff::flatbuf::ColumnMetadata) directly. #[derive(Default, Clone)] @@ -271,7 +242,7 @@ impl ToFlatBuffer for Chunk { Some(fb::NoDictionary::create(fbb, &fb::NoDictionaryArgs {}).as_union_value()), ), DictionaryEncoding::Dictionary(idxs) => { - let dictionary_encunit_idxs = Some(fbb.create_vector(&idxs)); + let dictionary_encunit_idxs = Some(fbb.create_vector(idxs)); ( fb::DictionaryEncoding::LocalDictionary, Some( @@ -334,12 +305,7 @@ impl From<&fb::WASMEncoding<'_>> for WASMEncoding { fn from(fb: &fb::WASMEncoding) -> Self { Self { wasm_id: fb.wasm_id(), - mini_encunit_sizes: fb - .mini_encunit_sizes() - .unwrap() - .into_iter() - .map(|x| x as u32) - .collect(), + mini_encunit_sizes: fb.mini_encunit_sizes().unwrap().into_iter().collect(), } } } @@ -396,10 +362,9 @@ impl From<&fb::Encoding<'_>> for Encoding { fn from(fb: &fb::Encoding) -> Self { Self { encoding_type: fb.type_(), - wasm_encoding: match fb.wasm_encoding() { - Some(fb_wasm_encoding) => Some(WASMEncoding::from(&fb_wasm_encoding)), - _ => None, - }, + wasm_encoding: fb + .wasm_encoding() + .map(|fb_wasm_encoding| WASMEncoding::from(&fb_wasm_encoding)), } } } @@ -428,10 +393,10 @@ impl ToFlatBuffer for Encoding { type Target<'a> = fb::Encoding<'a>; fn to_fb<'fb>(&self, fbb: &mut FlatBufferBuilder<'fb>) -> WIPOffset> { - let wasm_encoding = match &self.wasm_encoding { - Some(wasm_encoding) => Some(wasm_encoding.to_fb(fbb)), - _ => None, - }; + let wasm_encoding = self + .wasm_encoding + .as_ref() + .map(|wasm_encoding| wasm_encoding.to_fb(fbb)); fb::Encoding::create( fbb, &fb::EncodingArgs { @@ -704,10 +669,8 @@ impl<'a> Footer<'a> { let column_metadatas: Vec = row_group_projected_col_bufs .iter() .map(|buf| { - flatbuffers::root::( - &buf, - ) - .unwrap() + flatbuffers::root::(buf) + .unwrap() }) .collect(); GroupedColumnMetadata { @@ -768,7 +731,7 @@ impl<'a> Footer<'a> { .collect(); GroupedColumnMetadata { column_metadatas, - row_count: row_count, + row_count, _offset: offset, _size: size, } @@ -796,6 +759,7 @@ impl<'a> Footer<'a> { } } +#[allow(clippy::type_complexity)] pub fn parse_footer<'a>( footer_fbs: &fb::Footer<'a>, ) -> Result<( diff --git a/fff-poc/src/options.rs b/fff-poc/src/options.rs index a9535c4..d45241a 100644 --- a/fff-poc/src/options.rs +++ b/fff-poc/src/options.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, u64}; +use std::collections::HashMap; use arrow_schema::DataType; use fff_format::File::fff::flatbuf::CompressionType; @@ -147,10 +147,7 @@ impl FileWriterOptionsBuilder { /// Finalizes the configuration and returns immutable writer properties struct. pub fn build(self) -> FileWriterOptions { // TODO: better way of separting built-in wasm and custom extension wasm - assert!( - self.write_built_in_wasm && self.custom_encoding_options.len() == 0 - || !self.write_built_in_wasm - ); + assert!(!self.write_built_in_wasm || self.custom_encoding_options.is_empty()); FileWriterOptions { iounit_size: self.iounit_size, encoding_unit_len: self.encoding_unit_len, @@ -243,6 +240,10 @@ impl CustomEncodingOptions { self.wasms.len() } + pub fn is_empty(&self) -> bool { + self.wasms.len() == 0 + } + pub fn into_context(self) -> WASMWritingContext { WASMWritingContext::with_custom_wasms(self.wasms, self.data_type_to_wasm_id) } diff --git a/fff-poc/src/reader/builder.rs b/fff-poc/src/reader/builder.rs index abc9282..27c99a3 100644 --- a/fff-poc/src/reader/builder.rs +++ b/fff-poc/src/reader/builder.rs @@ -177,7 +177,7 @@ impl FileReaderV2Builder { row_groups_pointer.sizes().unwrap().iter() ) .map(|(row_count, offset, size)| RowGroupCntNPointer { - row_count: row_count, + row_count, _offset: offset, _size: size, }) @@ -220,7 +220,6 @@ impl FileReaderV2Builder { .col_metadatas() .unwrap() .into_iter() - .map(|v| v) .collect(), Projection::LeafColumnIndexes(ref projections) => { let mut column_meta_offsets = vec![]; diff --git a/fff-poc/src/reader/mod.rs b/fff-poc/src/reader/mod.rs index d65f552..67f135d 100644 --- a/fff-poc/src/reader/mod.rs +++ b/fff-poc/src/reader/mod.rs @@ -44,7 +44,7 @@ pub fn get_max_chunk_size(reader: R) -> Result { let mut max_size = 0; let rg_metas = footer.row_group_metadatas(); for rg_meta in rg_metas { - for (_, col_meta) in rg_meta.column_metadatas.iter().enumerate() { + for col_meta in rg_meta.column_metadatas.iter() { col_meta.column_chunks().unwrap().iter().for_each(|chunk| { // log::error!("chunk size: {}", chunk.size_()); max_size = std::cmp::max(max_size, chunk.size_() as usize); @@ -127,6 +127,7 @@ impl FileReaderV2 { ) } + #[allow(clippy::type_complexity)] pub fn get_shared_dict_sizes( &mut self, ) -> Result<(Vec, Vec>)> { @@ -204,7 +205,7 @@ fn read_file_based_on_footer( let mut record_batches = vec![]; let rg_metas = footer.row_group_metadatas(); // let projections = projections.map(|vec| vec.iter().map(|v| *v).collect::>()); - let selected_rg_metas = process_selection(selection, &rg_metas); + let selected_rg_metas = process_selection(selection, rg_metas); for (rg_meta, selection_in_rg) in selected_rg_metas { let mut column_idx = ColumnIndexSequence::default(); let mut columns = vec![]; @@ -214,9 +215,7 @@ fn read_file_based_on_footer( Arc::clone(field), &rg_meta.column_metadatas, &mut column_idx, - wasm_context - .as_ref() - .map(|wasm_context| Arc::clone(wasm_context)), + wasm_context.as_ref().map(Arc::clone), shared_dictionary_cache, checksum_type, )?; @@ -261,6 +260,7 @@ fn read_file_based_on_footer( Ok(record_batches) } +#[allow(clippy::type_complexity)] fn get_shared_dict_size_based_on_footer( footer: Footer, shared_dictionary_cache: &SharedDictionaryCache, @@ -343,9 +343,7 @@ fn point_access_list_struct( Arc::clone(&top_col_field), &rg_meta.column_metadatas, &mut column_idx, - wasm_context - .as_ref() - .map(|wasm_context| Arc::clone(wasm_context)), + wasm_context.as_ref().map(Arc::clone), shared_dictionary_cache, )?; let arrays = col_decoder.decode_batch_at_with_proj(col_leaf_id as usize, row_id, 1)?; @@ -377,10 +375,10 @@ fn point_access_list_struct( Ok(record_batches) } -fn collect_stat_for_col<'a>( +fn collect_stat_for_col( field: FieldRef, field_id: i32, - column_metas: &Vec>, + column_metas: &Vec>, column_idx: &mut ColumnIndexSequence, ) -> Result<()> { let column_index = column_idx.next_column_index(); @@ -529,16 +527,16 @@ fn read_postscript(reader: &R, file_size: u64) -> Result) -> Self { - Self::LeafColumnIndexes(indices.as_ref().iter().copied().collect()) + Self::LeafColumnIndexes(indices.as_ref().to_vec()) } -} \ No newline at end of file +} diff --git a/fff-poc/src/reader/selection.rs b/fff-poc/src/reader/selection.rs index a37a860..960ed4f 100644 --- a/fff-poc/src/reader/selection.rs +++ b/fff-poc/src/reader/selection.rs @@ -7,6 +7,6 @@ pub enum Selection { impl Selection { pub fn new(indices: impl AsRef<[u64]>) -> Self { - Self::RowIndexes(indices.as_ref().iter().copied().collect()) + Self::RowIndexes(indices.as_ref().to_vec()) } } diff --git a/fff-poc/src/writer.rs b/fff-poc/src/writer.rs index 966f5da..64627d9 100644 --- a/fff-poc/src/writer.rs +++ b/fff-poc/src/writer.rs @@ -168,6 +168,7 @@ where // } } +#[allow(clippy::arc_with_non_send_sync)] pub struct FileWriter { schema: Schema, column_encoders: Vec>, @@ -181,13 +182,14 @@ pub struct FileWriter { } impl FileWriter { + #[allow(clippy::arc_with_non_send_sync)] pub fn try_new(schema: SchemaRef, writer: W, mut options: FileWriterOptions) -> Result { let checksum_type = options.checksum_type(); let mut column_idx = ColumnIndexSequence::default(); let wasm_context = Arc::new( match ( options.write_built_in_wasm(), - options.custom_encoding_options().len() > 0, + !options.custom_encoding_options().is_empty(), ) { (true, false) => WASMWritingContext::default_with_always_set_custom_wasm(), (false, true) => options.take_custom_encoding_options().into_context(), @@ -271,15 +273,13 @@ impl FileWriter { .try_for_each(|chunk| self.state.flush_chunk(chunk))?; }; } - } else { - if let Some(res) = encoder.encode( - col.clone(), - &mut self.state.column_counters[i], - &mut self.shared_dictionary_context, - )? { - res.into_iter() - .try_for_each(|chunk| self.state.flush_chunk(chunk))?; - }; + } else if let Some(res) = encoder.encode( + col.clone(), + &mut self.state.column_counters[i], + &mut self.shared_dictionary_context, + )? { + res.into_iter() + .try_for_each(|chunk| self.state.flush_chunk(chunk))?; } } self.state.num_rows_in_file += batch.num_rows() as u32; @@ -349,7 +349,7 @@ impl FileWriter { self.state.write_and_update_file_level_checksum(wasm)?; let size = self.state.writer.stream_position()? - offset; let mut b = fb::MetadataSectionBuilder::new(&mut fbb); - b.add_offset(offset as u64); + b.add_offset(offset); b.add_size_(size as u32); b.add_compression_type(CompressionType::Uncompressed); Ok(b.finish()) @@ -450,7 +450,7 @@ impl FileWriter { let optional_metadata_section = { let name = fbb.create_string("WASMBinaries"); let names = fbb.create_vector(&[name]); - let offsets = fbb.create_vector(&[wasm_meta_start as u64]); + let offsets = fbb.create_vector(&[wasm_meta_start]); let sizes = fbb.create_vector(&[wasm_meta_size as u32]); let compression_types = fbb.create_vector(&[CompressionType::Uncompressed]); let mut builder = fb::OptionalMetadataSectionsBuilder::new(&mut fbb); diff --git a/fff-poc/tests/e2e.rs b/fff-poc/tests/e2e.rs index c89cb55..b8f7bc4 100644 --- a/fff-poc/tests/e2e.rs +++ b/fff-poc/tests/e2e.rs @@ -35,9 +35,8 @@ fn read_parquet_file(file_path: impl AsRef, batch_size: usize) -> Vec, o: &Arc) { - if i.len() != o.len() { - assert!(false); - } + assert_eq!(i.len(), o.len()); + if o.data_type().is_primitive() || matches!(o.data_type(), DataType::Binary | DataType::Utf8) { assert_eq!(i, o) } else if o.as_byte_view_opt::().is_some() { @@ -68,7 +67,7 @@ fn array_equal(i: &Arc, o: &Arc) { } else if let DataType::List(_) = o.data_type() { let i = i.as_list::(); let o = o.as_list::(); - array_equal(&i.values(), &o.values()); + array_equal(i.values(), o.values()); } else { unimplemented!() } @@ -99,11 +98,10 @@ fn test_read( take_record_batch(&input_single_batch, &UInt64Array::from(indexes)).unwrap() } }; - for (_i, (i_col, o_col)) in input_single_batch + for (i_col, o_col) in input_single_batch .columns() .iter() .zip(output_single_batch.columns().iter()) - .enumerate() { // println!("{i}"); // if i == 8 { @@ -485,10 +483,11 @@ async fn test_object_store() { let mut file = std::fs::OpenOptions::new() .write(true) .create(true) + .truncate(true) .open( Path::new(env!("CARGO_MANIFEST_DIR")) .join("data") - .join(&file_name), + .join(file_name), ) .unwrap(); write_batches( @@ -500,10 +499,11 @@ async fn test_object_store() { AmazonS3Builder::from_env() .with_url("s3://future-file-format/") .with_retry({ - let mut res = object_store::RetryConfig::default(); - res.retry_timeout = std::time::Duration::from_secs(10); - res.max_retries = 1; - res + object_store::RetryConfig { + retry_timeout: std::time::Duration::from_secs(10), + max_retries: 1, + ..Default::default() + } }) .build() .unwrap(), @@ -522,10 +522,11 @@ async fn test_s3() { AmazonS3Builder::from_env() .with_url("s3://future-file-format/") .with_retry({ - let mut res = object_store::RetryConfig::default(); - res.retry_timeout = std::time::Duration::from_secs(10); - res.max_retries = 1; - res + object_store::RetryConfig { + retry_timeout: std::time::Duration::from_secs(10), + max_retries: 1, + ..Default::default() + } }) .build() .unwrap(), @@ -673,7 +674,7 @@ fn test_core(#[case] enable_built_in_wasm: bool) { // let original_file = "/mnt/nvme0n1/xinyu/tmp/12_str.parquet"; // let original_file = "/mnt/nvme0n1/xinyu/tmp/core_no_double.parquet"; let original_file = "/mnt/nvme0n1/xinyu/data/parquet/core.parquet"; - let batches: Vec = read_parquet_file(&original_file, 64 * 1024); + let batches: Vec = read_parquet_file(original_file, 64 * 1024); test_read_file_roundtrip( &batches, Projection::default(), @@ -705,7 +706,7 @@ fn test_pco_custom_wasm() { WASMId(0), WasmLib::new( "/home/xinyu/fff-devel/target/release/libfff_ude_example_pco_real_encoder.so".into(), - std::fs::read("/home/xinyu/fff-devel/target/wasm32-wasip1/opt-size-lvl3/fff_ude_example_pco_real.wasm").unwrap().into(), + std::fs::read("/home/xinyu/fff-devel/target/wasm32-wasip1/opt-size-lvl3/fff_ude_example_pco_real.wasm").unwrap(), ), )]); let mut data_type_to_wasm_id: HashMap = HashMap::new(); diff --git a/fff-ude-wasm/src/lib.rs b/fff-ude-wasm/src/lib.rs index c03578f..16e5ae5 100644 --- a/fff-ude-wasm/src/lib.rs +++ b/fff-ude-wasm/src/lib.rs @@ -18,7 +18,6 @@ use anyhow::{anyhow, bail, ensure, Context}; use arrow_buffer::Buffer; -use once_cell; use ram_file::{RamFile, RamFileRef}; use std::collections::{HashMap, HashSet, VecDeque}; use std::fmt::Debug; @@ -85,6 +84,7 @@ impl Debug for Config { } } +#[allow(clippy::type_complexity)] pub struct Instance { // extern "C" fn(len: usize, align: usize) -> *mut u8 alloc: TypedFunc<(u32, u32), u32>, @@ -392,12 +392,12 @@ impl Runtime { assert!(output.is_ok(), "error: {:?}", output.as_ref().err()); let output = output.unwrap(); // put the instance back to the pool if no more results - if output.is_none() { + if let Some(output) = output { + drop(guard); + Ok(StreamReadResult::Batch((output, instance))) + } else { self.instances.lock().unwrap().push_back(instance.clone()); Ok(StreamReadResult::End) - } else { - drop(guard); - Ok(StreamReadResult::Batch((output.unwrap(), instance))) } } @@ -514,6 +514,10 @@ impl WasmSlice { pub fn len(&self) -> u32 { self.len } + + pub fn is_empty(&self) -> bool { + self.len == 0 + } } impl Instance { diff --git a/fff-ude/src/ffi.rs b/fff-ude/src/ffi.rs index ead89cd..08e88f2 100644 --- a/fff-ude/src/ffi.rs +++ b/fff-ude/src/ffi.rs @@ -103,7 +103,7 @@ pub unsafe fn scalar_wrapper( // pub len: usize, // } -/// This does not work because if returning a struct, wasm FFI simply requires one extra input param of the address of the struct. +// /// This does not work because if returning a struct, wasm FFI simply requires one extra input param of the address of the struct. // pub unsafe fn scalar_wrapper_v2(function: ScalarDecode, ptr: *const u8, len: usize) -> ResSlice { // let input = std::slice::from_raw_parts(ptr, len); // match call_primitive(function, input) { @@ -174,11 +174,11 @@ fn call_string(function: StringDecode, input_bytes: &[u8]) -> Result Vec { } pub fn ppd_deserialize(bytes: &[u8]) -> &ArchivedPPDExpr { - rkyv::access::(&bytes[..]).unwrap() + rkyv::access::(bytes).unwrap() } diff --git a/fff-ude/src/lib.rs b/fff-ude/src/lib.rs index d178ede..319ba51 100644 --- a/fff-ude/src/lib.rs +++ b/fff-ude/src/lib.rs @@ -1,3 +1,5 @@ +#![allow(clippy::missing_safety_doc)] + use arrow_buffer::{Buffer, MutableBuffer}; use arrow_data::ArrayData; pub use fff_core::errors::Result; diff --git a/third_party/lance-datagen/src/generator.rs b/third_party/lance-datagen/src/generator.rs index 3bce6ee..e2eb9c4 100644 --- a/third_party/lance-datagen/src/generator.rs +++ b/third_party/lance-datagen/src/generator.rs @@ -1,7 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use std::{iter, marker::PhantomData, sync::Arc}; +use std::{marker::PhantomData, sync::Arc}; use arrow::{ array::{ArrayData, AsArray}, @@ -170,7 +170,7 @@ impl ArrayGenerator for NullGenerator { } } else { let array_len = array.len(); - let num_validity_bytes = (array_len + 7) / 8; + let num_validity_bytes = array_len.div_ceil(8); let mut null_count = 0; // Sampling the RNG once per bit is kind of slow so we do this to sample once // per byte. We only get 8 bits of RNG resolution but that should be good enough. @@ -493,7 +493,7 @@ impl ArrayGenerator for RandomBooleanGenerator { length: RowCount, rng: &mut rand_xoshiro::Xoshiro256PlusPlus, ) -> Result, ArrowError> { - let num_bytes = (length.0 + 7) / 8; + let num_bytes = length.0.div_ceil(8); let mut bytes = vec![0; num_bytes as usize]; rng.fill_bytes(&mut bytes); let bytes = BooleanBuffer::new(Buffer::from_vec(bytes), 0, length.0 as usize); @@ -687,7 +687,7 @@ impl ArrayGenerator for RandomBinaryGenerator { let bytes = Buffer::from_vec(bytes); if self.is_large { let offsets = OffsetBuffer::from_lengths( - iter::repeat(self.bytes_per_element.0 as usize).take(length.0 as usize), + std::iter::repeat_n(self.bytes_per_element.0 as usize, length.0 as usize), ); if self.scale_to_utf8 { // This is safe because we are only using printable characters @@ -705,7 +705,7 @@ impl ArrayGenerator for RandomBinaryGenerator { } } else { let offsets = OffsetBuffer::from_lengths( - iter::repeat(self.bytes_per_element.0 as usize).take(length.0 as usize), + std::iter::repeat_n(self.bytes_per_element.0 as usize, length.0 as usize), ); if self.scale_to_utf8 { // This is safe because we are only using printable characters @@ -844,7 +844,7 @@ impl ArrayGenerator for FixedBinaryGenerator { .copied(), )); let offsets = - OffsetBuffer::from_lengths(iter::repeat(self.value.len()).take(length.0 as usize)); + OffsetBuffer::from_lengths(std::iter::repeat_n(self.value.len(), length.0 as usize)); Ok(Arc::new(arrow_array::GenericByteArray::::new( offsets, bytes, None, ))) diff --git a/wasm-libs/fff-ude-example-fsst/src/lib.rs b/wasm-libs/fff-ude-example-fsst/src/lib.rs index 57cae17..a9536ff 100644 --- a/wasm-libs/fff-ude-example-fsst/src/lib.rs +++ b/wasm-libs/fff-ude-example-fsst/src/lib.rs @@ -1,5 +1,3 @@ -use std::vec; - use arrow_buffer::{Buffer, MutableBuffer}; use byteorder::{LittleEndian, ReadBytesExt}; use bytes::Bytes; @@ -22,14 +20,12 @@ fn decode_fsst_general(input: &[u8]) -> Result>> let symbols = bytes.split_to(symbols_size as usize); let symbols = bytemuck::cast_slice(&symbols); let lengths = bytes.split_to(length_size as usize); - let decompressor = Decompressor::new(&symbols, &lengths); + let decompressor = Decompressor::new(symbols, &lengths); let output = Buffer::from(decompressor.decompress(&bytes)); // Const ptr should not be dropped std::mem::forget(bytes); - let mut res: Vec = vec![]; // assume no nulls - res.push(MutableBuffer::from_len_zeroed(0).into()); - res.push(output); + let res = [MutableBuffer::from_len_zeroed(0).into(), output]; Ok(Box::new(res.into_iter())) } diff --git a/wasm-libs/fff-ude-example-noop/src/lib.rs b/wasm-libs/fff-ude-example-noop/src/lib.rs index 2f97050..b52ee38 100644 --- a/wasm-libs/fff-ude-example-noop/src/lib.rs +++ b/wasm-libs/fff-ude-example-noop/src/lib.rs @@ -8,7 +8,7 @@ fn noop(_input: &[u8]) -> Result> { // black_box(some_buffer); // Th answer is yes. // let vec = Vec::new(); - let vec = unsafe { Vec::from_raw_parts(0 as *mut u8, 0, 0) }; + let vec = unsafe { Vec::from_raw_parts(std::ptr::null_mut::(), 0, 0) }; Ok(vec.into_boxed_slice()) } diff --git a/wasm-libs/test-size/src/lib.rs b/wasm-libs/test-size/src/lib.rs index b85e1f9..f07ed11 100644 --- a/wasm-libs/test-size/src/lib.rs +++ b/wasm-libs/test-size/src/lib.rs @@ -1,10 +1,10 @@ -/// A very simple Wasm lib to test the output size of Wasm between Wasi and unknown +//! A very simple Wasm lib to test the output size of Wasm between Wasi and unknown #[unsafe(no_mangle)] pub unsafe extern "C" fn decode_vortex_general_ffi(input: i32) -> i32 { if input == 0 { - return 1; + 1 } else { - return input * decode_vortex_general_ffi(input - 1); + input * decode_vortex_general_ffi(input - 1) } } diff --git a/wasm-libs/test-wmemcheck/src/lib.rs b/wasm-libs/test-wmemcheck/src/lib.rs index eae816b..419d8d1 100644 --- a/wasm-libs/test-wmemcheck/src/lib.rs +++ b/wasm-libs/test-wmemcheck/src/lib.rs @@ -1,11 +1,10 @@ -/// A very simple wasm lib to verify that wmemcheck will fail only calling `alloc` +//! A very simple wasm lib to verify that wmemcheck will fail only calling `alloc` /// Allocate memory. /// /// # Safety /// /// See [`std::alloc::GlobalAlloc::alloc`]. - #[unsafe(no_mangle)] pub unsafe extern "C" fn alloc(len: usize, align: usize) -> *mut u8 { std::alloc::alloc(std::alloc::Layout::from_size_align_unchecked(len, align)) diff --git a/wasm-libs/wasm-test-encoders/src/lib.rs b/wasm-libs/wasm-test-encoders/src/lib.rs index 337fcb4..45a3575 100644 --- a/wasm-libs/wasm-test-encoders/src/lib.rs +++ b/wasm-libs/wasm-test-encoders/src/lib.rs @@ -14,6 +14,7 @@ use arrow_array::{ }; use arrow_buffer::{Buffer, MutableBuffer}; use arrow_schema::DataType; +use bytemuck::AnyBitPattern; use byteorder::{LittleEndian, ReadBytesExt}; use bytes::Bytes; use fastlanes::BitPacking; @@ -131,109 +132,54 @@ pub fn encode_flsbp_general( encoded_data } +#[allow(clippy::uninit_vec)] +fn segment_to_buffer( + input: &[u8], + bitwidth: usize, + len: usize, +) -> Buffer { + let input = bytemuck::cast_slice(input); + let mut output = Vec::::with_capacity(len); + unsafe { + output.set_len(len); + } + let packed_len = 128 * bitwidth / size_of::(); + for (start, end) in (0..len).step_by(1024).map(|i| (i, i + 1024)) { + let i = start / 1024; + unsafe { + BitPacking::unchecked_unpack( + bitwidth, + &input[i * packed_len..(i + 1) * packed_len], + &mut output[start..end], + ); + } + } + Buffer::from(output) +} + fn decode_fls_bp(input: &[u8], _wasm: bool) -> Result>> { let len: u32 = (&input[input.len() - 4..input.len()]).read_u32::()?; let bitwidth: u32 = (&input[input.len() - 8..input.len() - 4]).read_u32::()?; let typeid: u32 = (&input[input.len() - 12..input.len() - 8]).read_u32::()?; let input = &input[0..input.len() - 12]; let output_buffer = match typeid { - 0 => { - type T = u8; - let input = bytemuck::cast_slice(input); - let mut output = Vec::::new(); - let packed_len = 128 * bitwidth as usize / size_of::(); - output.reserve(len as usize); - unsafe { - output.set_len(len as usize); - } - for (start, end) in (0..len as usize).step_by(1024).map(|i| (i, i + 1024)) { - let i = start / 1024; - unsafe { - BitPacking::unchecked_unpack( - bitwidth as usize, - &input[i * packed_len..(i + 1) * packed_len], - &mut output[start..end], - ); - } - } - Buffer::from(output) - } - 1 => { - type T = u16; - let input = bytemuck::cast_slice(input); - let mut output = Vec::::new(); - let packed_len = 128 * bitwidth as usize / size_of::(); - output.reserve(len as usize); - unsafe { - output.set_len(len as usize); - } - for (start, end) in (0..len as usize).step_by(1024).map(|i| (i, i + 1024)) { - let i = start / 1024; - unsafe { - BitPacking::unchecked_unpack( - bitwidth as usize, - &input[i * packed_len..(i + 1) * packed_len], - &mut output[start..end], - ); - } - } - Buffer::from(output) - } - 2 => { - type T = u32; - let input = bytemuck::cast_slice(input); - let mut output = Vec::::new(); - let packed_len = 128 * bitwidth as usize / size_of::(); - output.reserve(len as usize); - unsafe { - output.set_len(len as usize); - } - for (start, end) in (0..len as usize).step_by(1024).map(|i| (i, i + 1024)) { - let i = start / 1024; - unsafe { - BitPacking::unchecked_unpack( - bitwidth as usize, - &input[i * packed_len..(i + 1) * packed_len], - &mut output[start..end], - ); - } - } - Buffer::from(output) - } - 3 => { - type T = u64; - let input = bytemuck::cast_slice(input); - let mut output = Vec::::new(); - let packed_len = 128 * bitwidth as usize / size_of::(); - output.reserve(len as usize); - unsafe { - output.set_len(len as usize); - } - for (start, end) in (0..len as usize).step_by(1024).map(|i| (i, i + 1024)) { - let i = start / 1024; - unsafe { - BitPacking::unchecked_unpack( - bitwidth as usize, - &input[i * packed_len..(i + 1) * packed_len], - &mut output[start..end], - ); - } - } - Buffer::from(output) - } + 0 => segment_to_buffer::(input, bitwidth as usize, len as usize), + 1 => segment_to_buffer::(input, bitwidth as usize, len as usize), + 2 => segment_to_buffer::(input, bitwidth as usize, len as usize), + 3 => segment_to_buffer::(input, bitwidth as usize, len as usize), _ => panic!(), }; - let mut res: Vec = vec![]; - // assume no nulls. We omit Nulls decode for purely testing the codec. - res.push(MutableBuffer::from_len_zeroed(0).into()); - res.push(output_buffer); // if wasm { // unsafe { // dealloc(input.as_ptr() as *mut u8, input.len(), 1); // } // } - Ok(Box::new(res.into_iter())) + + // assume no nulls. We omit Nulls decode for purely testing the codec. + Ok(Box::new( + [MutableBuffer::from_len_zeroed(0).into(), output_buffer].into_iter(), + )) } pub fn decode_flsbp_general(input: &[u8]) -> Result>> { @@ -277,16 +223,17 @@ pub fn decode_pco_general(input: &[u8]) -> Result Buffer::from(simple_decompress::(input).unwrap()), _ => panic!(), }; - let mut res: Vec = vec![]; + // assume no nulls. We omit Nulls decode for purely testing the codec. - res.push(MutableBuffer::from_len_zeroed(0).into()); - res.push(recovered); - Ok(Box::new(res.into_iter())) + Ok(Box::new( + [MutableBuffer::from_len_zeroed(0).into(), recovered].into_iter(), + )) } /// FIXME: We cannot return RustBuffer in the desired dylib because the underlying lib may be conpiled from C or other languages. /// In that sense, like the Arrow's FFI, it should also return a function pointer on how to drop this returned buffer. /// We will fix this. Or find others who did this before. +#[allow(clippy::missing_safety_doc)] pub unsafe extern "C" fn encode_pco_real_general_c( input: FFI_ArrowArray, schema: FFI_ArrowSchema, @@ -365,7 +312,7 @@ where _ => panic!("Unsupported type"), }; let mut encoded = vec![]; - let null_flag: u32 = input.nulls().is_some().then(|| 1).unwrap_or(0); + let null_flag: u32 = if input.nulls().is_some() { 1 } else { 0 }; encoded.extend_from_slice(&null_flag.to_le_bytes()); if let Some(buf) = input.nulls() { encoded.extend_from_slice(&((buf.buffer().len() as u32).to_le_bytes())); @@ -409,6 +356,7 @@ pub fn decode_pco_real_general(input: &[u8]) -> Result panic!("Unsupported type"), }; let mut encoded = vec![]; - let null_flag: u32 = input.nulls().is_some().then(|| 1).unwrap_or(0); + let null_flag: u32 = if input.nulls().is_some() { 1 } else { 0 }; encoded.extend_from_slice(&null_flag.to_le_bytes()); if let Some(buf) = input.nulls() { encoded.extend_from_slice(&((buf.buffer().len() as u32).to_le_bytes())); @@ -504,7 +452,7 @@ where // Write down dict // Iterate dict in the order of idx // Step 1: Collect the keys and values into a vector of tuples - let mut pairs: Vec<_> = dict.iter().map(|(k, v)| (k, v)).collect(); + let mut pairs: Vec<_> = dict.iter().collect(); // Step 2: Sort the vector based on the values pairs.sort_by(|a, b| a.1.cmp(b.1)); for (key, _) in pairs.into_iter() { @@ -549,7 +497,7 @@ fn custom_decompress(input: &[u8], len: u32) -> Result> { let dict_start = len as usize / 128; let mut res = Vec::new(); for i in 0..dict_start { - let idx = input[i as usize]; + let idx = input[i]; let start = dict_start + (idx as usize) * 128 * size_of::(); let end = start + 128 * size_of::(); res.extend_from_slice(&input[start..end]); @@ -562,12 +510,12 @@ pub fn encode_lz4_general(input: &[u8]) -> Vec { } pub fn decode_lz4_general(input: &[u8]) -> Result>> { let out = decompress_size_prepended(input).unwrap(); - let mut res: Vec = vec![]; + // assume no nulls - res.push(MutableBuffer::from_len_zeroed(0).into()); // This is incorrect because the result is not Arrow Array. We omit it here simply for testing decoding performance. - res.push(Buffer::from(out)); - Ok(Box::new(res.into_iter())) + Ok(Box::new( + [MutableBuffer::from_len_zeroed(0).into(), Buffer::from(out)].into_iter(), + )) } pub fn encode_gzip_general(input: &[u8]) -> Vec { @@ -579,12 +527,16 @@ pub fn decode_gzip_general(input: &[u8]) -> Result = vec![]; + // assume no nulls - res.push(MutableBuffer::from_len_zeroed(0).into()); - // This is incorrect because the result is not Arrow Array. We omit it here simply for testing decoding performance. - res.push(Buffer::from(s.into_bytes())); - Ok(Box::new(res.into_iter())) + Ok(Box::new( + [ + MutableBuffer::from_len_zeroed(0).into(), + // This is incorrect because the result is not Arrow Array. We omit it here simply for testing decoding performance. + Buffer::from(s.into_bytes()), + ] + .into_iter(), + )) } pub fn encode_zstd_general(input: &[u8]) -> Vec { @@ -597,12 +549,16 @@ pub fn encode_zstd_general(input: &[u8]) -> Vec { pub fn decode_zstd_general(input: &[u8]) -> Result>> { let mut out = Vec::new(); zstd::stream::copy_decode(input, &mut out).unwrap(); - let mut res: Vec = vec![]; + // assume no nulls - res.push(MutableBuffer::from_len_zeroed(0).into()); - // This is incorrect because the result is not Arrow Array. We omit it here simply for testing decoding performance. - res.push(Buffer::from_vec(out)); - Ok(Box::new(res.into_iter())) + Ok(Box::new( + [ + MutableBuffer::from_len_zeroed(0).into(), + // This is incorrect because the result is not Arrow Array. We omit it here simply for testing decoding performance. + Buffer::from_vec(out), + ] + .into_iter(), + )) } pub fn encode_lz4_general2(input: ArrayRef) -> Vec {