Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions fff-bench/src/bench_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -994,6 +994,7 @@ impl BenchmarkDatasets {
let fff = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(output_path)
.unwrap();
write_fff(&batches, &fff, options.clone()).unwrap();
Expand Down Expand Up @@ -1029,7 +1030,7 @@ impl BenchmarkDatasets {
if PRINT_CR {
error!(
"FFF CR: {:.2}",
fff_size as f64 / get_arrow_size(self, &output_fname) as f64
fff_size as f64 / get_arrow_size(self, output_fname) as f64
);
}
}
Expand Down Expand Up @@ -1240,9 +1241,14 @@ impl BenchmarkDataset for BenchmarkDatasets {
BenchmarkDatasets::CFB(_) => {
write_csv_as_parquet(f, output_path, ",", "", opt.rg_size, false)
}
BenchmarkDatasets::PBI(_) => {
write_csv_as_parquet(f, output_path, "|", "null", opt.rg_size, opt.is_dict_scope)
}
BenchmarkDatasets::PBI(_) => write_csv_as_parquet(
f,
output_path,
"|",
"null",
opt.rg_size,
opt.is_dict_scope,
),
BenchmarkDatasets::LAION(_) => {
unreachable!()
}
Expand Down Expand Up @@ -1519,8 +1525,10 @@ impl BenchmarkDataset for BenchmarkDatasets {
}

async fn write_as_lance_v2_1(&self) {
let mut opts = LanceFileWriterOptions::default();
opts.format_version = Some(LanceFileVersion::V2_1);
let opts = lance_file::v2::writer::FileWriterOptions {
format_version: Some(LanceFileVersion::V2_1),
..Default::default()
};
self._write_as_lance(opts).await
}

Expand Down
25 changes: 13 additions & 12 deletions fff-bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub fn write_fff(batches: &[RecordBatch], fff: &File, options: FileWriterOptions
let mut memory_usage_sum = 0;
let mut cnt = 0;
for batch in batches {
fff_writer.write_batch(&batch).unwrap();
fff_writer.write_batch(batch).unwrap();
if fff_writer.memory_size() != 0 {
memory_usage_sum += fff_writer.memory_size();
cnt += 1;
Expand All @@ -122,10 +122,11 @@ pub fn read_fff(pathbuf: PathBuf, opt: ReadFFFOpt) -> Result<Vec<RecordBatch>> {
object_store::aws::AmazonS3Builder::from_env()
.with_url(path)
.with_retry({
let mut res = object_store::RetryConfig::default();
res.retry_timeout = std::time::Duration::from_secs(10);
res.max_retries = 1;
res
object_store::RetryConfig {
retry_timeout: std::time::Duration::from_secs(10),
max_retries: 1,
..Default::default()
}
})
.build()
.unwrap(),
Expand All @@ -134,7 +135,7 @@ pub fn read_fff(pathbuf: PathBuf, opt: ReadFFFOpt) -> Result<Vec<RecordBatch>> {
// count the number of slashes
let slash_count = path.chars().filter(|&c| c == '/').count();
assert!(slash_count == 3, "only support s3://bucket/file_name");
let file_name = path.split('/').last().unwrap();
let file_name = path.split('/').next_back().unwrap();
let f1 = fff_poc::io::reader::ObjectStoreReadAt::new(
bucket.clone(),
object_store::path::Path::from(file_name).into(),
Expand Down Expand Up @@ -172,7 +173,7 @@ pub fn read_fff_aot_wasm(
pub fn write_parquet(batches: &[RecordBatch], parquet: &File) -> Result<()> {
let mut parquet_writer = ArrowWriter::try_new(parquet, batches[0].schema(), None).unwrap();
for batch in batches {
parquet_writer.write(&batch).unwrap();
parquet_writer.write(batch).unwrap();
}
parquet_writer.close().unwrap();
Ok(())
Expand Down Expand Up @@ -234,7 +235,7 @@ pub async fn read_lance(
full_path: bool,
) -> Result<usize> {
let object_store = Arc::new({
let res = if path.starts_with("s3://") {
if path.starts_with("s3://") {
let (store, _) = lance_io::object_store::ObjectStore::from_uri(path)
.await
.unwrap();
Expand All @@ -243,8 +244,7 @@ pub async fn read_lance(
let mut res = lance_io::object_store::ObjectStore::local();
res.set_io_parallelism(1);
res
};
res
}
});
let path = if full_path {
if path.starts_with("s3://") {
Expand Down Expand Up @@ -318,6 +318,7 @@ pub fn write_orc(batches: &[RecordBatch], path: &str) -> Result<()> {
let file = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)?;
let mut writer = orc_rust::ArrowWriterBuilder::new(file, batches[0].schema())
.try_build()
Expand Down Expand Up @@ -401,7 +402,7 @@ pub fn parquet_decompress_from<T: ChunkReader + 'static>(
let builder = if let Some(projections) = projections {
let file_metadata = builder.metadata().file_metadata().clone();
let mask =
ProjectionMask::leaves(file_metadata.schema_descr(), projections.iter().map(|&x| x));
ProjectionMask::leaves(file_metadata.schema_descr(), projections.iter().copied());
builder.with_projection(mask)
} else {
builder
Expand Down Expand Up @@ -440,7 +441,7 @@ pub async fn parquet_decompress_from_async(
let builder = if let Some(projections) = projections {
let file_metadata = builder.metadata().file_metadata().clone();
let mask =
ProjectionMask::leaves(file_metadata.schema_descr(), projections.iter().map(|&x| x));
ProjectionMask::leaves(file_metadata.schema_descr(), projections.iter().copied());
builder.with_projection(mask)
} else {
builder
Expand Down
2 changes: 1 addition & 1 deletion fff-core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pub mod errors;
pub mod macros;
pub mod util;
pub mod macros;
2 changes: 1 addition & 1 deletion fff-core/src/util/bit_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ pub fn ceil<T: num::Integer>(value: T, divisor: T) -> T {
#[inline]
pub fn padding_size(size: usize, alignment: usize) -> usize {
size.next_multiple_of(alignment) - size
}
}
6 changes: 3 additions & 3 deletions fff-core/src/util/buffer_to_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ pub fn primitive_array_from_arrow_buffers_iter(
let null_buffer = arrow_buffer_to_validity(null_buffer, num_rows);

let data_buffer = buffer_iter.next().unwrap();
let data_buffer = Buffer::from(data_buffer);
let data_buffer = data_buffer;
let data_buffer = BooleanBuffer::new(data_buffer, 0, num_rows as usize);

Ok(Arc::new(BooleanArray::new(data_buffer, null_buffer)))
Expand Down Expand Up @@ -734,12 +734,12 @@ pub fn primitive_array_from_buffers(
DataType::List(child) => Ok(new_list_offsets_validity::<Int32Type>(
buffers,
num_rows,
Arc::clone(&child),
Arc::clone(child),
)),
DataType::LargeList(child) => Ok(new_list_offsets_validity::<Int64Type>(
buffers,
num_rows,
Arc::clone(&child),
Arc::clone(child),
)),
_ => Err(Error::IO(
format!(
Expand Down
2 changes: 1 addition & 1 deletion fff-encoding/src/data_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub(crate) struct DataBuffersIter<'a> {
index: usize,
}

impl<'a> Iterator for DataBuffersIter<'a> {
impl Iterator for DataBuffersIter<'_> {
type Item = Bytes;

fn next(&mut self) -> Option<Self::Item> {
Expand Down
8 changes: 4 additions & 4 deletions fff-encoding/src/enc_unit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,17 @@ impl FlatEncUnit {
self.encoding_tree.serialize(&mut s).unwrap();
let encoding_tree = s.take_buffer();
write.write_u32::<LittleEndian>(encoding_tree.len() as u32)?;
write.write(&encoding_tree)?;
write.write_all(&encoding_tree)?;
let written_len =
4 + self.buffer_sizes.as_ref().unwrap().len() * 4 + 4 + encoding_tree.len();
write.write(&ZEROS[..padding_size(written_len, ALIGNMENT)])?;
write.write_all(&ZEROS[..padding_size(written_len, ALIGNMENT)])?;
for buf in self.buffers.iter() {
for buffer in buf.iter() {
write.write_all(buffer.as_ref())?;
}
}
let written_len = write.stream_position()? - start;
write.write(&ZEROS[..padding_size(written_len as usize, ALIGNMENT)])?;
write.write_all(&ZEROS[..padding_size(written_len as usize, ALIGNMENT)])?;
Ok(write)
}

Expand Down Expand Up @@ -185,7 +185,7 @@ impl FlatEncUnit {
Ok(Self {
encoding_tree,
buffer_sizes: None,
buffers: buffers.into_iter().map(|b| DataBuffer::Dense(b)).collect(),
buffers: buffers.into_iter().map(DataBuffer::Dense).collect(),
})
}

Expand Down
2 changes: 1 addition & 1 deletion fff-encoding/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
mod data_buffer;
pub mod enc_unit;
pub mod schemes;
pub mod enc_unit;
1 change: 0 additions & 1 deletion fff-encoding/src/schemes/bp.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
/// Deprecated after using Vortex.
/// Leave it here only for legacy usage.

use std::mem;

use crate::enc_unit::ALIGNMENT;
Expand Down
25 changes: 11 additions & 14 deletions fff-encoding/src/schemes/vortex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,7 @@ impl VortexEncoder {
}

fn regular_encode(&self, arr: ArrayRef) -> Result<EncUnit> {
debug_assert!(match arr.data_type() {
non_nest_types!() => true,
_ => false,
});
debug_assert!(matches!(arr.data_type(), non_nest_types!()));
Ok(EncUnit::new(
self.encode_arr(arr)?,
Encoding::Vortex,
Expand All @@ -121,10 +118,10 @@ impl VortexEncoder {

/// For List type, we only encode the validity and offsets, children are handled by logical encoders.
fn list_encode(&self, arr: ArrayRef) -> Result<EncUnit> {
debug_assert!(match arr.data_type() {
DataType::List(_) | DataType::LargeList(_) => true,
_ => false,
});
debug_assert!(matches!(
arr.data_type(),
DataType::List(_) | DataType::LargeList(_)
));
let (validity, offsets) = match arr.data_type() {
DataType::List(_) => {
let list_array = arr
Expand Down Expand Up @@ -177,10 +174,10 @@ impl VortexEncoder {
}

pub fn list_struct_encode(&self, list_arr: ArrayRef, field: ArrayRef) -> Result<EncUnit> {
debug_assert!(match list_arr.data_type() {
DataType::List(_) | DataType::LargeList(_) => true,
_ => false,
});
debug_assert!(matches!(
list_arr.data_type(),
DataType::List(_) | DataType::LargeList(_)
));
let (validity, offsets) = match list_arr.data_type() {
DataType::List(_) => {
let list_array = list_arr
Expand Down Expand Up @@ -302,7 +299,7 @@ impl VortexDecoderBuilder {
}

pub fn with_ppd(mut self, ppd: VtxPPD) -> Self {
assert!(self.partial_decode == false);
assert!(!self.partial_decode);
self.ppd = Some(ppd);
self
}
Expand Down Expand Up @@ -419,7 +416,7 @@ impl Decoder for VortexDecoder {
/// FIXME: decode_a_vector should be redesigned to be indexing based. Or just like slice.
fn decode_a_vector(&mut self) -> Result<Option<Vec<Buffer>>> {
Ok(self.vortex_array.as_ref().map(|arr| {
let arr = slice(&arr, 0, 1024).expect("Slice failed");
let arr = slice(arr, 0, 1024).expect("Slice failed");
let mut res = Vec::new();
let arrow_array = vortex_array_to_arrow(arr);
if let Some(b) = arrow_array.nulls() {
Expand Down
13 changes: 5 additions & 8 deletions fff-format/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,13 @@ fn main() -> Result<()> {
);

// compile flatc if it does not exist
if flatc.check().is_err() {
if !Command::new("sh")
if flatc.check().is_err() && !Command::new("sh")
.current_dir(Path::new(&env::var("CARGO_MANIFEST_DIR").unwrap()).join(".."))
.args(&["scripts/install_flatc.sh"])
.args(["scripts/install_flatc.sh"])
.status()
.with_context(|| "install_flatc.sh failed")?
.success()
{
anyhow::bail!("install_flatc.sh failed");
}
.success() {
anyhow::bail!("install_flatc.sh failed");
}

flatc
Expand All @@ -45,7 +42,7 @@ fn main() -> Result<()> {
.iter()
.map(|p| p.as_path())
.collect::<Vec<&Path>>(),
out_dir: Path::new(env::var("OUT_DIR").unwrap().as_str()).as_ref(),
out_dir: Path::new(env::var("OUT_DIR").unwrap().as_str()),
extra: &["--filename-suffix", ""],
..Default::default()
})?;
Expand Down
5 changes: 2 additions & 3 deletions fff-poc/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,13 @@ impl WASMWritingContext {
}

pub fn data_type_to_wasm_id(&self, dt: &DataType) -> Option<WASMId> {
self.data_type_to_wasm_id.get(dt).map(|&x| x)
self.data_type_to_wasm_id.get(dt).copied()
}

pub fn data_type_to_wasm_lib(&self, dt: &DataType) -> Option<WasmLib> {
self.data_type_to_wasm_id
.get(dt)
.map(|x| self.wasms.get(x).cloned())
.flatten()
.and_then(|x| self.wasms.get(x).cloned())
}

pub fn always_set_custom_wasm_for_built_in(&self) -> bool {
Expand Down
39 changes: 11 additions & 28 deletions fff-poc/src/decoder/encunit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl<'a> WASMEncUnitDecoder<'a> {
}
}

impl<'a> EncUnitDecoder for WASMEncUnitDecoder<'a> {
impl EncUnitDecoder for WASMEncUnitDecoder<'_> {
fn decode(&self) -> Result<ArrayRef> {
match &self.output_type {
non_nest_types!() => {
Expand Down Expand Up @@ -142,21 +142,13 @@ impl EncUnitDecoder for VortexEncUnitDecoder {
vortex_decoder.decode_all_as_array()?
}
DataType::List(ref child) | DataType::LargeList(ref child)
if match child.data_type() {
DataType::Struct(fields)
if fields
.iter()
.map(|f| match f.data_type() {
non_nest_types!() => true,
_ => false,
})
.fold(true, |acc, x| acc && x)
&& cfg!(feature = "list-offsets-pushdown") =>
{
true
}
_ => false,
} =>
if matches!(child.data_type(),
DataType::Struct(fields)
if fields
.iter()
.all(|f| matches!(f.data_type(), non_nest_types!()))
&& cfg!(feature = "list-offsets-pushdown")
) =>
{
let mut vortex_decoder = VortexListStructDecoder::try_new(
bytes,
Expand Down Expand Up @@ -199,21 +191,12 @@ impl EncUnitDecoder for VortexEncUnitDecoder {
vortex_decoder.slice(start, stop)?
}
DataType::List(ref child) | DataType::LargeList(ref child)
if match child.data_type() {
if matches!(child.data_type(),
DataType::Struct(fields)
if fields
.iter()
.map(|f| match f.data_type() {
non_nest_types!() => true,
_ => false,
})
.fold(true, |acc, x| acc && x)
&& cfg!(feature = "list-offsets-pushdown") =>
{
true
}
_ => false,
} =>
.all(|f| matches!(f.data_type(), non_nest_types!()))
&& cfg!(feature = "list-offsets-pushdown")) =>
{
let mut vortex_decoder = VortexListStructDecoder::try_new(
bytes,
Expand Down
Loading