Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ if(ICEBERG_BUILD_BUNDLE)
avro/avro_data_util.cc
avro/avro_direct_decoder.cc
avro/avro_direct_encoder.cc
avro/avro_metrics.cc
avro/avro_reader.cc
avro/avro_writer.cc
avro/avro_register.cc
Expand Down
31 changes: 31 additions & 0 deletions src/iceberg/avro/avro_metrics.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/avro/avro_metrics.h"

namespace iceberg::avro {

Metrics AvroMetrics::GetMetrics(const Schema& /*schema*/, int64_t num_records,
const MetricsConfig& /*metrics_config*/) {
// TODO(WZhuo) will populate in following PRs if datum writer is a
// MetricsAwareDatumWriter
return Metrics{.row_count = num_records};
}

} // namespace iceberg::avro
46 changes: 46 additions & 0 deletions src/iceberg/avro/avro_metrics.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include "iceberg/iceberg_bundle_export.h"
#include "iceberg/metrics.h"

namespace iceberg {
class Schema;
class MetricsConfig;
} // namespace iceberg

namespace iceberg::avro {

/// \brief Utility class for computing Avro file metrics.
class ICEBERG_BUNDLE_EXPORT AvroMetrics {
public:
AvroMetrics() = delete;

/// \brief Compute metrics from writer state.
/// \param schema The Iceberg schema of the written data.
/// \param num_records The number of records written.
/// \param metrics_config The metrics configuration.
/// \return Metrics for the written Avro file.
static Metrics GetMetrics(const Schema& schema, int64_t num_records,
const MetricsConfig& metrics_config);
};

} // namespace iceberg::avro
21 changes: 14 additions & 7 deletions src/iceberg/avro/avro_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@
#include "iceberg/arrow/arrow_status_internal.h"
#include "iceberg/avro/avro_data_util_internal.h"
#include "iceberg/avro/avro_direct_encoder_internal.h"
#include "iceberg/avro/avro_metrics.h"
#include "iceberg/avro/avro_register.h"
#include "iceberg/avro/avro_schema_util_internal.h"
#include "iceberg/avro/avro_stream_internal.h"
#include "iceberg/metrics_config.h"
#include "iceberg/schema.h"
#include "iceberg/schema_internal.h"
#include "iceberg/util/checked_cast.h"
Expand Down Expand Up @@ -238,6 +240,7 @@ class AvroWriter::Impl {
ICEBERG_RETURN_UNEXPECTED(backend_->WriteRow(*write_schema_, *result, i));
}

num_records_ += result->length();
return {};
}

Expand All @@ -261,6 +264,14 @@ class AvroWriter::Impl {
return current_pos;
}

Result<Metrics> metrics() const {
if (!Closed()) {
return Invalid("AvroWriter is not closed");
}
return AvroMetrics::GetMetrics(*write_schema_, num_records_,
*MetricsConfig::Default());
}

private:
// The schema to write.
std::shared_ptr<::iceberg::Schema> write_schema_;
Expand All @@ -272,6 +283,8 @@ class AvroWriter::Impl {
ArrowSchema arrow_schema_;
// Total length of the written Avro file.
int64_t total_bytes_ = 0;
// Number of records written.
int64_t num_records_ = 0;
// The write backend to write data.
std::unique_ptr<AvroWriteBackend> backend_;
};
Expand All @@ -292,13 +305,7 @@ Status AvroWriter::Close() {
return {};
}

Result<Metrics> AvroWriter::metrics() {
if (impl_->Closed()) {
// TODO(xiao.dong) implement metrics
return {};
}
return Invalid("AvroWriter is not closed");
}
Result<Metrics> AvroWriter::metrics() { return impl_->metrics(); }

Result<int64_t> AvroWriter::length() { return impl_->length(); }

Expand Down
38 changes: 38 additions & 0 deletions src/iceberg/test/avro_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,44 @@ TEST_P(AvroWriterTest, MultipleAvroBlocks) {
}
}

TEST_P(AvroWriterTest, Metrics) {
auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
SchemaField::MakeOptional(2, "name", std::make_shared<StringType>())});

std::string test_data = R"([[1, "Alice"], [2, "Bob"], [3, "Charlie"]])";

// Write data but don't close yet
ArrowSchema arrow_c_schema;
ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk());
auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();
auto array = ::arrow::json::ArrayFromJSONString(arrow_schema, test_data).ValueOrDie();
struct ArrowArray arrow_array;
ASSERT_TRUE(::arrow::ExportArray(*array, &arrow_array).ok());

ICEBERG_UNWRAP_OR_FAIL(
writer_,
WriterFactoryRegistry::Open(
FileFormatType::kAvro,
{.path = temp_avro_file_, .schema = schema, .io = file_io_, .properties = {}}));
ASSERT_THAT(writer_->Write(&arrow_array), IsOk());

// Metrics should fail before close
ASSERT_THAT(writer_->metrics(), IsError(ErrorKind::kInvalid));

// After close, metrics should succeed
ASSERT_THAT(writer_->Close(), IsOk());
ICEBERG_UNWRAP_OR_FAIL(auto metrics, writer_->metrics());
ASSERT_TRUE(metrics.row_count.has_value());
EXPECT_EQ(metrics.row_count.value(), 3);
EXPECT_TRUE(metrics.column_sizes.empty());
EXPECT_TRUE(metrics.value_counts.empty());
EXPECT_TRUE(metrics.null_value_counts.empty());
EXPECT_TRUE(metrics.nan_value_counts.empty());
EXPECT_TRUE(metrics.lower_bounds.empty());
EXPECT_TRUE(metrics.upper_bounds.empty());
}

// Instantiate parameterized tests for both direct encoder and GenericDatum paths
INSTANTIATE_TEST_SUITE_P(DirectEncoderModes, AvroWriterTest,
::testing::Values(true, false),
Expand Down
Loading