diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index ada9b473a..74fbb52c3 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -178,6 +178,7 @@ if(ICEBERG_BUILD_BUNDLE) avro/avro_data_util.cc avro/avro_direct_decoder.cc avro/avro_direct_encoder.cc + avro/avro_metrics.cc avro/avro_reader.cc avro/avro_writer.cc avro/avro_register.cc diff --git a/src/iceberg/avro/avro_metrics.cc b/src/iceberg/avro/avro_metrics.cc new file mode 100644 index 000000000..82880c46c --- /dev/null +++ b/src/iceberg/avro/avro_metrics.cc @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/avro/avro_metrics.h" + +namespace iceberg::avro { + +Metrics AvroMetrics::GetMetrics(const Schema& /*schema*/, int64_t num_records, + const MetricsConfig& /*metrics_config*/) { + // TODO(WZhuo) will populate in following PRs if datum writer is a + // MetricsAwareDatumWriter + return Metrics{.row_count = num_records}; +} + +} // namespace iceberg::avro diff --git a/src/iceberg/avro/avro_metrics.h b/src/iceberg/avro/avro_metrics.h new file mode 100644 index 000000000..799c00ee3 --- /dev/null +++ b/src/iceberg/avro/avro_metrics.h @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include "iceberg/iceberg_bundle_export.h" +#include "iceberg/metrics.h" + +namespace iceberg { +class Schema; +class MetricsConfig; +} // namespace iceberg + +namespace iceberg::avro { + +/// \brief Utility class for computing Avro file metrics. +class ICEBERG_BUNDLE_EXPORT AvroMetrics { + public: + AvroMetrics() = delete; + + /// \brief Compute metrics from writer state. + /// \param schema The Iceberg schema of the written data. + /// \param num_records The number of records written. + /// \param metrics_config The metrics configuration. + /// \return Metrics for the written Avro file. + static Metrics GetMetrics(const Schema& schema, int64_t num_records, + const MetricsConfig& metrics_config); +}; + +} // namespace iceberg::avro diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc index 350530b3d..32ce3f634 100644 --- a/src/iceberg/avro/avro_writer.cc +++ b/src/iceberg/avro/avro_writer.cc @@ -33,9 +33,11 @@ #include "iceberg/arrow/arrow_status_internal.h" #include "iceberg/avro/avro_data_util_internal.h" #include "iceberg/avro/avro_direct_encoder_internal.h" +#include "iceberg/avro/avro_metrics.h" #include "iceberg/avro/avro_register.h" #include "iceberg/avro/avro_schema_util_internal.h" #include "iceberg/avro/avro_stream_internal.h" +#include "iceberg/metrics_config.h" #include "iceberg/schema.h" #include "iceberg/schema_internal.h" #include "iceberg/util/checked_cast.h" @@ -238,6 +240,7 @@ class AvroWriter::Impl { ICEBERG_RETURN_UNEXPECTED(backend_->WriteRow(*write_schema_, *result, i)); } + num_records_ += result->length(); return {}; } @@ -261,6 +264,14 @@ class AvroWriter::Impl { return current_pos; } + Result metrics() const { + if (!Closed()) { + return Invalid("AvroWriter is not closed"); + } + return AvroMetrics::GetMetrics(*write_schema_, num_records_, + *MetricsConfig::Default()); + } + private: // The schema to write. std::shared_ptr<::iceberg::Schema> write_schema_; @@ -272,6 +283,8 @@ class AvroWriter::Impl { ArrowSchema arrow_schema_; // Total length of the written Avro file. int64_t total_bytes_ = 0; + // Number of records written. + int64_t num_records_ = 0; // The write backend to write data. std::unique_ptr backend_; }; @@ -292,13 +305,7 @@ Status AvroWriter::Close() { return {}; } -Result AvroWriter::metrics() { - if (impl_->Closed()) { - // TODO(xiao.dong) implement metrics - return {}; - } - return Invalid("AvroWriter is not closed"); -} +Result AvroWriter::metrics() { return impl_->metrics(); } Result AvroWriter::length() { return impl_->length(); } diff --git a/src/iceberg/test/avro_test.cc b/src/iceberg/test/avro_test.cc index 518948d30..82da97ea3 100644 --- a/src/iceberg/test/avro_test.cc +++ b/src/iceberg/test/avro_test.cc @@ -936,6 +936,44 @@ TEST_P(AvroWriterTest, MultipleAvroBlocks) { } } +TEST_P(AvroWriterTest, Metrics) { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", std::make_shared()), + SchemaField::MakeOptional(2, "name", std::make_shared())}); + + std::string test_data = R"([[1, "Alice"], [2, "Bob"], [3, "Charlie"]])"; + + // Write data but don't close yet + ArrowSchema arrow_c_schema; + ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk()); + auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie(); + auto array = ::arrow::json::ArrayFromJSONString(arrow_schema, test_data).ValueOrDie(); + struct ArrowArray arrow_array; + ASSERT_TRUE(::arrow::ExportArray(*array, &arrow_array).ok()); + + ICEBERG_UNWRAP_OR_FAIL( + writer_, + WriterFactoryRegistry::Open( + FileFormatType::kAvro, + {.path = temp_avro_file_, .schema = schema, .io = file_io_, .properties = {}})); + ASSERT_THAT(writer_->Write(&arrow_array), IsOk()); + + // Metrics should fail before close + ASSERT_THAT(writer_->metrics(), IsError(ErrorKind::kInvalid)); + + // After close, metrics should succeed + ASSERT_THAT(writer_->Close(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto metrics, writer_->metrics()); + ASSERT_TRUE(metrics.row_count.has_value()); + EXPECT_EQ(metrics.row_count.value(), 3); + EXPECT_TRUE(metrics.column_sizes.empty()); + EXPECT_TRUE(metrics.value_counts.empty()); + EXPECT_TRUE(metrics.null_value_counts.empty()); + EXPECT_TRUE(metrics.nan_value_counts.empty()); + EXPECT_TRUE(metrics.lower_bounds.empty()); + EXPECT_TRUE(metrics.upper_bounds.empty()); +} + // Instantiate parameterized tests for both direct encoder and GenericDatum paths INSTANTIATE_TEST_SUITE_P(DirectEncoderModes, AvroWriterTest, ::testing::Values(true, false),