diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 519757d26..f4346a9d6 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -43,6 +43,7 @@ set(ICEBERG_SOURCES manifest/manifest_list.cc manifest/manifest_reader.cc manifest/manifest_writer.cc + manifest/rolling_manifest_writer.cc manifest/v1_metadata.cc manifest/v2_metadata.cc manifest/v3_metadata.cc diff --git a/src/iceberg/file_writer.h b/src/iceberg/file_writer.h index f3540dd75..72ab0da18 100644 --- a/src/iceberg/file_writer.h +++ b/src/iceberg/file_writer.h @@ -103,7 +103,7 @@ class ICEBERG_EXPORT Writer { virtual Result metrics() = 0; /// \brief Get the file length. - /// Only valid after the file is closed. + /// This can be called while the writer is still open or after the file is closed. virtual Result length() = 0; /// \brief Returns a list of recommended split locations, if applicable, empty diff --git a/src/iceberg/manifest/manifest_entry.h b/src/iceberg/manifest/manifest_entry.h index f82b7a2be..5d35530b8 100644 --- a/src/iceberg/manifest/manifest_entry.h +++ b/src/iceberg/manifest/manifest_entry.h @@ -46,7 +46,7 @@ enum class ManifestStatus { /// \brief Get the relative manifest status type from int ICEBERG_EXPORT constexpr Result ManifestStatusFromInt( - int status) noexcept { + int32_t status) noexcept { switch (status) { case 0: return ManifestStatus::kExisting; @@ -387,7 +387,7 @@ ICEBERG_EXPORT constexpr std::string_view ToString(DataFile::Content type) noexc /// \brief Get the relative data file content type from int ICEBERG_EXPORT constexpr Result DataFileContentFromInt( - int content) noexcept { + int32_t content) noexcept { switch (content) { case 0: return DataFile::Content::kData; diff --git a/src/iceberg/manifest/manifest_writer.cc b/src/iceberg/manifest/manifest_writer.cc index 8cd940d56..e3d2564ad 100644 --- a/src/iceberg/manifest/manifest_writer.cc +++ b/src/iceberg/manifest/manifest_writer.cc @@ -217,6 +217,8 @@ ManifestContent ManifestWriter::content() const { return adapter_->content(); } Result ManifestWriter::metrics() const { return writer_->metrics(); } +Result ManifestWriter::length() const { return writer_->length(); } + Result ManifestWriter::ToManifestFile() const { if (!closed_) [[unlikely]] { return Invalid("Cannot get ManifestFile before closing the writer."); diff --git a/src/iceberg/manifest/manifest_writer.h b/src/iceberg/manifest/manifest_writer.h index a708e6e3b..218ceb595 100644 --- a/src/iceberg/manifest/manifest_writer.h +++ b/src/iceberg/manifest/manifest_writer.h @@ -108,6 +108,10 @@ class ICEBERG_EXPORT ManifestWriter { /// \note Only valid after the file is closed. Result metrics() const; + /// \brief Get the current length of the manifest file in bytes. + /// \return The current length of the file, or an error if the operation fails. + Result length() const; + /// \brief Get the ManifestFile object. /// \note Only valid after the file is closed. Result ToManifestFile() const; diff --git a/src/iceberg/manifest/meson.build b/src/iceberg/manifest/meson.build index f49c5a5f5..00f93c599 100644 --- a/src/iceberg/manifest/meson.build +++ b/src/iceberg/manifest/meson.build @@ -21,6 +21,7 @@ install_headers( 'manifest_list.h', 'manifest_reader.h', 'manifest_writer.h', + 'rolling_manifest_writer.h', ], subdir: 'iceberg/manifest', ) diff --git a/src/iceberg/manifest/rolling_manifest_writer.cc b/src/iceberg/manifest/rolling_manifest_writer.cc new file mode 100644 index 000000000..827c48754 --- /dev/null +++ b/src/iceberg/manifest/rolling_manifest_writer.cc @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/manifest/rolling_manifest_writer.h" + +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/result.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +RollingManifestWriter::RollingManifestWriter( + ManifestWriterFactory manifest_writer_factory, int64_t target_file_size_in_bytes) + : manifest_writer_factory_(std::move(manifest_writer_factory)), + target_file_size_in_bytes_(target_file_size_in_bytes) {} + +RollingManifestWriter::~RollingManifestWriter() { + // Ensure we close the current writer if not already closed + if (!closed_) { + (void)CloseCurrentWriter(); + } +} + +Status RollingManifestWriter::WriteAddedEntry( + std::shared_ptr file, std::optional data_sequence_number) { + ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter()); + ICEBERG_RETURN_UNEXPECTED( + writer->WriteAddedEntry(std::move(file), data_sequence_number)); + current_file_rows_++; + return {}; +} + +Status RollingManifestWriter::WriteExistingEntry( + std::shared_ptr file, int64_t file_snapshot_id, + int64_t data_sequence_number, std::optional file_sequence_number) { + ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter()); + ICEBERG_RETURN_UNEXPECTED(writer->WriteExistingEntry( + std::move(file), file_snapshot_id, data_sequence_number, file_sequence_number)); + current_file_rows_++; + return {}; +} + +Status RollingManifestWriter::WriteDeletedEntry( + std::shared_ptr file, int64_t data_sequence_number, + std::optional file_sequence_number) { + ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter()); + ICEBERG_RETURN_UNEXPECTED(writer->WriteDeletedEntry( + std::move(file), data_sequence_number, file_sequence_number)); + current_file_rows_++; + return {}; +} + +Status RollingManifestWriter::Close() { + if (!closed_) { + ICEBERG_RETURN_UNEXPECTED(CloseCurrentWriter()); + closed_ = true; + } + return {}; +} + +Result> RollingManifestWriter::ToManifestFiles() const { + if (!closed_) { + return Invalid("Cannot get ManifestFile list from unclosed writer"); + } + return manifest_files_; +} + +Result RollingManifestWriter::CurrentWriter() { + if (current_writer_ == nullptr) { + ICEBERG_ASSIGN_OR_RAISE(current_writer_, manifest_writer_factory_()); + } else if (ShouldRollToNewFile()) { + ICEBERG_RETURN_UNEXPECTED(CloseCurrentWriter()); + ICEBERG_ASSIGN_OR_RAISE(current_writer_, manifest_writer_factory_()); + } + + return current_writer_.get(); +} + +bool RollingManifestWriter::ShouldRollToNewFile() const { + if (current_writer_ == nullptr) { + return false; + } + // Roll when row count is a multiple of ROWS_DIVISOR and file size >= target + if (current_file_rows_ % kRowsDivisor == 0) { + auto length_result = current_writer_->length(); + if (length_result.has_value()) { + return length_result.value() >= target_file_size_in_bytes_; + } + // If we can't get the length, don't roll + } + return false; +} + +Status RollingManifestWriter::CloseCurrentWriter() { + if (current_writer_ != nullptr) { + ICEBERG_RETURN_UNEXPECTED(current_writer_->Close()); + ICEBERG_ASSIGN_OR_RAISE(auto manifest_file, current_writer_->ToManifestFile()); + manifest_files_.push_back(std::move(manifest_file)); + current_writer_.reset(); + current_file_rows_ = 0; + } + return {}; +} + +} // namespace iceberg diff --git a/src/iceberg/manifest/rolling_manifest_writer.h b/src/iceberg/manifest/rolling_manifest_writer.h new file mode 100644 index 000000000..477f476f5 --- /dev/null +++ b/src/iceberg/manifest/rolling_manifest_writer.h @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/manifest/rolling_manifest_writer.h +/// Rolling manifest writer that can produce multiple manifest files. + +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/result.h" + +namespace iceberg { + +/// \brief A rolling manifest writer that can produce multiple manifest files. +/// +/// As opposed to ManifestWriter, a rolling writer could produce multiple manifest +/// files. +class ICEBERG_EXPORT RollingManifestWriter { + public: + /// \brief Factory function type for creating ManifestWriter instances. + using ManifestWriterFactory = std::function>()>; + + /// \brief Construct a rolling manifest writer. + /// \param manifest_writer_factory Factory function to create new ManifestWriter + /// instances. + /// \param target_file_size_in_bytes Target file size in bytes. When the current + /// file reaches this size (and row count is a multiple of 250), a new file + /// will be created. + RollingManifestWriter(ManifestWriterFactory manifest_writer_factory, + int64_t target_file_size_in_bytes); + + ~RollingManifestWriter(); + + /// \brief Add an added entry for a file. + /// + /// \param file a data file + /// \return Status::OK() if the entry was written successfully + /// \note The entry's snapshot ID will be this manifest's snapshot ID. The + /// entry's data sequence number will be the provided data sequence number. + /// The entry's file sequence number will be assigned at commit. + Status WriteAddedEntry(std::shared_ptr file, + std::optional data_sequence_number = std::nullopt); + + /// \brief Add an existing entry for a file. + /// + /// \param file an existing data file + /// \param file_snapshot_id snapshot ID when the data file was added to the table + /// \param data_sequence_number a data sequence number of the file (assigned when + /// the file was added) + /// \param file_sequence_number a file sequence number (assigned when the file + /// was added) + /// \return Status::OK() if the entry was written successfully + /// \note The original data and file sequence numbers, snapshot ID, which were + /// assigned at commit, must be preserved when adding an existing entry. + Status WriteExistingEntry(std::shared_ptr file, int64_t file_snapshot_id, + int64_t data_sequence_number, + std::optional file_sequence_number = std::nullopt); + + /// \brief Add a delete entry for a file. + /// + /// \param file a deleted data file + /// \param data_sequence_number a data sequence number of the file (assigned when + /// the file was added) + /// \param file_sequence_number a file sequence number (assigned when the file + /// was added) + /// \return Status::OK() if the entry was written successfully + /// \note The entry's snapshot ID will be this manifest's snapshot ID. However, + /// the original data and file sequence numbers of the file must be preserved + /// when the file is marked as deleted. + Status WriteDeletedEntry(std::shared_ptr file, int64_t data_sequence_number, + std::optional file_sequence_number = std::nullopt); + + /// \brief Close the rolling manifest writer. + Status Close(); + + /// \brief Get the list of manifest files produced by this writer. + /// \return A vector of ManifestFile objects + /// \note Only valid after the writer is closed. + Result> ToManifestFiles() const; + + private: + /// \brief Get or create the current writer, rolling to a new file if needed. + /// \return The current ManifestWriter, or an error if creation fails + Result CurrentWriter(); + + /// \brief Check if we should roll to a new file. + /// + /// This method checks if the current file has reached the target size + /// or the number of rows has reached the threshold. If so, it rolls to a new file. + bool ShouldRollToNewFile() const; + + /// \brief Close the current writer and add its ManifestFile to the list. + Status CloseCurrentWriter(); + + static constexpr int64_t kRowsDivisor = 250; + + ManifestWriterFactory manifest_writer_factory_; + int64_t target_file_size_in_bytes_; + std::vector manifest_files_; + + int64_t current_file_rows_{0}; + std::unique_ptr current_writer_{nullptr}; + bool closed_{false}; +}; + +} // namespace iceberg diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 8327ca2e0..12b1ae2e7 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -65,6 +65,7 @@ iceberg_sources = files( 'manifest/manifest_list.cc', 'manifest/manifest_reader.cc', 'manifest/manifest_writer.cc', + 'manifest/rolling_manifest_writer.cc', 'manifest/v1_metadata.cc', 'manifest/v2_metadata.cc', 'manifest/v3_metadata.cc', diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 4b2c0f473..fce7257ac 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -124,7 +124,8 @@ if(ICEBERG_BUILD_BUNDLE) manifest_list_versions_test.cc manifest_reader_stats_test.cc manifest_reader_test.cc - manifest_writer_versions_test.cc) + manifest_writer_versions_test.cc + rolling_manifest_writer_test.cc) add_iceberg_test(arrow_test USE_BUNDLE diff --git a/src/iceberg/test/rolling_manifest_writer_test.cc b/src/iceberg/test/rolling_manifest_writer_test.cc new file mode 100644 index 000000000..8ea13869e --- /dev/null +++ b/src/iceberg/test/rolling_manifest_writer_test.cc @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/manifest/rolling_manifest_writer.h" + +#include +#include +#include +#include +#include +#include + +#include + +#include "iceberg/arrow/arrow_file_io.h" +#include "iceberg/avro/avro_register.h" +#include "iceberg/file_format.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/partition_spec.h" +#include "iceberg/row/partition_values.h" +#include "iceberg/schema.h" +#include "iceberg/schema_field.h" +#include "iceberg/test/matchers.h" +#include "iceberg/transform.h" +#include "iceberg/type.h" + +namespace iceberg { + +namespace { + +constexpr int64_t kSequenceNumber = 34L; +constexpr int64_t kSnapshotId = 987134631982734L; +constexpr std::string_view kPath = + "s3://bucket/table/category=cheesy/timestamp_hour=10/id_bucket=3/file.avro"; +constexpr FileFormatType kFormat = FileFormatType::kAvro; +constexpr int64_t kFirstRowId = 100L; +constexpr int64_t kFileSizeCheckRowsDivisor = 250; +constexpr int64_t kSmallFileSize = 10L; + +const PartitionValues kPartition = + PartitionValues({Literal::String("cheesy"), Literal::Int(10), Literal::Int(3)}); + +std::shared_ptr CreateDataFile(int64_t record_count) { + auto data_file = std::make_shared(); + data_file->file_path = std::format("data_bucket=0/file-{}.parquet", record_count); + data_file->file_format = FileFormatType::kParquet; + data_file->partition = kPartition; + data_file->file_size_in_bytes = 1024; + data_file->record_count = record_count; + return data_file; +} + +std::shared_ptr CreateDeleteFile(int64_t record_count) { + auto delete_file = std::make_shared(); + delete_file->content = DataFile::Content::kPositionDeletes; + delete_file->file_path = std::format("/path/to/delete-{}.parquet", record_count); + delete_file->file_format = FileFormatType::kParquet; + delete_file->partition = kPartition; + delete_file->file_size_in_bytes = 10; + delete_file->record_count = record_count; + return delete_file; +} + +static std::string CreateManifestPath() { + return std::format("manifest-{}.avro", + std::chrono::system_clock::now().time_since_epoch().count()); +} + +} // namespace + +class RollingManifestWriterTest : public ::testing::TestWithParam { + protected: + void SetUp() override { + avro::RegisterAll(); + schema_ = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", int64()), + SchemaField::MakeRequired(2, "timestamp", timestamp_tz()), + SchemaField::MakeRequired(3, "category", string()), + SchemaField::MakeRequired(4, "data", string()), + SchemaField::MakeRequired(5, "double", float64())}); + spec_ = PartitionSpec::Make( + 0, {PartitionField(3, 1000, "category", Transform::Identity()), + PartitionField(2, 1001, "timestamp_hour", Transform::Hour()), + PartitionField(1, 1002, "id_bucket", Transform::Bucket(16))}) + .value(); + + file_io_ = iceberg::arrow::MakeMockFileIO(); + } + + RollingManifestWriter::ManifestWriterFactory NewRollingWriteManifestFactory( + int32_t format_version) { + return [this, format_version]() -> Result> { + const std::string manifest_path = CreateManifestPath(); + Result> writer_result = + NotSupported("Format version: {}", format_version); + + if (format_version == 1) { + writer_result = ManifestWriter::MakeV1Writer(kSnapshotId, manifest_path, file_io_, + spec_, schema_); + } else if (format_version == 2) { + writer_result = ManifestWriter::MakeV2Writer( + kSnapshotId, manifest_path, file_io_, spec_, schema_, ManifestContent::kData); + } else if (format_version == 3) { + writer_result = ManifestWriter::MakeV3Writer(kSnapshotId, kFirstRowId, + manifest_path, file_io_, spec_, + schema_, ManifestContent::kData); + } + + return writer_result; + }; + } + + RollingManifestWriter::ManifestWriterFactory NewRollingWriteDeleteManifestFactory( + int32_t format_version) { + return [this, format_version]() -> Result> { + const std::string manifest_path = CreateManifestPath(); + Result> writer_result = + NotSupported("Format version: {}", format_version); + + if (format_version == 2) { + writer_result = + ManifestWriter::MakeV2Writer(kSnapshotId, manifest_path, file_io_, spec_, + schema_, ManifestContent::kDeletes); + } else if (format_version == 3) { + writer_result = ManifestWriter::MakeV3Writer(kSnapshotId, kFirstRowId, + manifest_path, file_io_, spec_, + schema_, ManifestContent::kDeletes); + } + + return writer_result; + }; + } + + void CheckManifests(const std::vector& manifests, + const std::vector& added_file_counts, + const std::vector& existing_file_counts, + const std::vector& deleted_file_counts, + const std::vector& added_row_counts, + const std::vector& existing_row_counts, + const std::vector& deleted_row_counts) { + ASSERT_EQ(manifests.size(), added_file_counts.size()); + for (size_t i = 0; i < manifests.size(); i++) { + const ManifestFile& manifest = manifests[i]; + EXPECT_TRUE(manifest.has_added_files()); + EXPECT_EQ(manifest.added_files_count.value_or(0), added_file_counts[i]); + EXPECT_EQ(manifest.added_rows_count.value_or(0), added_row_counts[i]); + + EXPECT_TRUE(manifest.has_existing_files()); + EXPECT_EQ(manifest.existing_files_count.value_or(0), existing_file_counts[i]); + EXPECT_EQ(manifest.existing_rows_count.value_or(0), existing_row_counts[i]); + + EXPECT_TRUE(manifest.has_deleted_files()); + EXPECT_EQ(manifest.deleted_files_count.value_or(0), deleted_file_counts[i]); + EXPECT_EQ(manifest.deleted_rows_count.value_or(0), deleted_row_counts[i]); + } + } + + std::shared_ptr schema_; + std::shared_ptr spec_; + std::shared_ptr file_io_; +}; + +TEST_P(RollingManifestWriterTest, TestRollingManifestWriterNoRecords) { + int32_t format_version = GetParam(); + RollingManifestWriter writer(NewRollingWriteManifestFactory(format_version), + kSmallFileSize); + + EXPECT_THAT(writer.Close(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto manifest_files, writer.ToManifestFiles()); + EXPECT_TRUE(manifest_files.empty()); + + // Test that calling close again doesn't change the result + EXPECT_THAT(writer.Close(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(manifest_files, writer.ToManifestFiles()); + EXPECT_TRUE(manifest_files.empty()); +} + +TEST_P(RollingManifestWriterTest, TestRollingDeleteManifestWriterNoRecords) { + int32_t format_version = GetParam(); + if (format_version < 2) { + GTEST_SKIP() << "Delete manifests only supported in V2+"; + } + RollingManifestWriter writer(NewRollingWriteDeleteManifestFactory(format_version), + kSmallFileSize); + + EXPECT_THAT(writer.Close(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto manifest_files, writer.ToManifestFiles()); + EXPECT_TRUE(manifest_files.empty()); + + // Test that calling close again doesn't change the result + EXPECT_THAT(writer.Close(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(manifest_files, writer.ToManifestFiles()); + EXPECT_TRUE(manifest_files.empty()); +} + +TEST_P(RollingManifestWriterTest, TestRollingManifestWriterSplitFiles) { + int32_t format_version = GetParam(); + RollingManifestWriter writer(NewRollingWriteManifestFactory(format_version), + kSmallFileSize); + + std::vector added_file_counts(3, 0); + std::vector existing_file_counts(3, 0); + std::vector deleted_file_counts(3, 0); + std::vector added_row_counts(3, 0); + std::vector existing_row_counts(3, 0); + std::vector deleted_row_counts(3, 0); + + // Write 750 entries (3 * 250) to trigger 3 file splits + for (int32_t i = 0; i < kFileSizeCheckRowsDivisor * 3; i++) { + int32_t type = i % 3; + int32_t file_index = i / kFileSizeCheckRowsDivisor; + auto data_file = CreateDataFile(i); + + if (type == 0) { + EXPECT_THAT(writer.WriteAddedEntry(data_file), IsOk()); + added_file_counts[file_index] += 1; + added_row_counts[file_index] += i; + } else if (type == 1) { + EXPECT_THAT(writer.WriteExistingEntry(data_file, 1, 1, std::nullopt), IsOk()); + existing_file_counts[file_index] += 1; + existing_row_counts[file_index] += i; + } else { + EXPECT_THAT(writer.WriteDeletedEntry(data_file, 1, std::nullopt), IsOk()); + deleted_file_counts[file_index] += 1; + deleted_row_counts[file_index] += i; + } + } + + EXPECT_THAT(writer.Close(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto manifest_files, writer.ToManifestFiles()); + EXPECT_EQ(manifest_files.size(), 3); + + CheckManifests(manifest_files, added_file_counts, existing_file_counts, + deleted_file_counts, added_row_counts, existing_row_counts, + deleted_row_counts); + + // Test that calling close again doesn't change the result + EXPECT_THAT(writer.Close(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(manifest_files, writer.ToManifestFiles()); + EXPECT_EQ(manifest_files.size(), 3); + + CheckManifests(manifest_files, added_file_counts, existing_file_counts, + deleted_file_counts, added_row_counts, existing_row_counts, + deleted_row_counts); +} + +TEST_P(RollingManifestWriterTest, TestRollingDeleteManifestWriterSplitFiles) { + int32_t format_version = GetParam(); + if (format_version < 2) { + GTEST_SKIP() << "Delete manifests only supported in V2+"; + } + RollingManifestWriter writer(NewRollingWriteDeleteManifestFactory(format_version), + kSmallFileSize); + + std::vector added_file_counts(3, 0); + std::vector existing_file_counts(3, 0); + std::vector deleted_file_counts(3, 0); + std::vector added_row_counts(3, 0); + std::vector existing_row_counts(3, 0); + std::vector deleted_row_counts(3, 0); + + // Write 750 entries (3 * 250) to trigger 3 file splits + for (int32_t i = 0; i < 3 * kFileSizeCheckRowsDivisor; i++) { + int32_t type = i % 3; + int32_t file_index = i / kFileSizeCheckRowsDivisor; + auto delete_file = CreateDeleteFile(i); + + if (type == 0) { + EXPECT_THAT(writer.WriteAddedEntry(delete_file), IsOk()); + added_file_counts[file_index] += 1; + added_row_counts[file_index] += i; + } else if (type == 1) { + EXPECT_THAT(writer.WriteExistingEntry(delete_file, 1, 1, std::nullopt), IsOk()); + existing_file_counts[file_index] += 1; + existing_row_counts[file_index] += i; + } else { + EXPECT_THAT(writer.WriteDeletedEntry(delete_file, 1, std::nullopt), IsOk()); + deleted_file_counts[file_index] += 1; + deleted_row_counts[file_index] += i; + } + } + + EXPECT_THAT(writer.Close(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto manifest_files, writer.ToManifestFiles()); + EXPECT_EQ(manifest_files.size(), 3); + + CheckManifests(manifest_files, added_file_counts, existing_file_counts, + deleted_file_counts, added_row_counts, existing_row_counts, + deleted_row_counts); + + // Test that calling close again doesn't change the result + EXPECT_THAT(writer.Close(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(manifest_files, writer.ToManifestFiles()); + EXPECT_EQ(manifest_files.size(), 3); + + CheckManifests(manifest_files, added_file_counts, existing_file_counts, + deleted_file_counts, added_row_counts, existing_row_counts, + deleted_row_counts); +} + +INSTANTIATE_TEST_SUITE_P(TestRollingManifestWriter, RollingManifestWriterTest, + ::testing::Values(1, 2, 3)); + +} // namespace iceberg diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index 9404fe2e1..dd727e947 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -97,7 +97,7 @@ Status Transaction::Apply(PendingUpdate& update) { } break; default: return NotSupported("Unsupported pending update: {}", - static_cast(update.kind())); + static_cast(update.kind())); } last_update_committed_ = true; diff --git a/src/iceberg/type.cc b/src/iceberg/type.cc index 44512c0d3..e878ab8fd 100644 --- a/src/iceberg/type.cc +++ b/src/iceberg/type.cc @@ -19,6 +19,7 @@ #include "iceberg/type.h" +#include #include #include #include @@ -169,7 +170,7 @@ Result> ListType::GetFieldById( } Result> ListType::GetFieldByIndex( - int index) const { + int32_t index) const { if (index == 0) { return std::cref(element_); }