Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(metadata): export iceberg schema in manifests table #871

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 135 additions & 54 deletions crates/iceberg/src/metadata_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@

//! Metadata table api.

use std::collections::HashMap;
use std::sync::Arc;

use arrow_array::builder::{
BooleanBuilder, ListBuilder, MapBuilder, PrimitiveBuilder, StringBuilder, StructBuilder,
};
use arrow_array::types::{Int32Type, Int64Type, Int8Type, TimestampMillisecondType};
use arrow_array::types::{Int32Type, Int64Type, TimestampMillisecondType};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit};

use crate::arrow::schema_to_arrow_schema;
use crate::spec::{ListType, NestedField, PrimitiveType, StructType, Type};
use crate::table::Table;
use crate::Result;

Expand Down Expand Up @@ -134,44 +137,122 @@ pub struct ManifestsTable<'a> {
}

impl<'a> ManifestsTable<'a> {
fn partition_summary_fields(&self) -> Vec<Field> {
vec![
Field::new("contains_null", DataType::Boolean, false),
Field::new("contains_nan", DataType::Boolean, true),
Field::new("lower_bound", DataType::Utf8, true),
Field::new("upper_bound", DataType::Utf8, true),
]
}

/// Returns the schema of the manifests table.
pub fn schema(&self) -> Schema {
Schema::new(vec![
Field::new("content", DataType::Int8, false),
Field::new("path", DataType::Utf8, false),
Field::new("length", DataType::Int64, false),
Field::new("partition_spec_id", DataType::Int32, false),
Field::new("added_snapshot_id", DataType::Int64, false),
Field::new("added_data_files_count", DataType::Int32, false),
Field::new("existing_data_files_count", DataType::Int32, false),
Field::new("deleted_data_files_count", DataType::Int32, false),
Field::new("added_delete_files_count", DataType::Int32, false),
Field::new("existing_delete_files_count", DataType::Int32, false),
Field::new("deleted_delete_files_count", DataType::Int32, false),
Field::new(
/// Returns the iceberg schema of the manifests table.
pub fn schema(&self) -> crate::spec::Schema {
let fields = vec![
NestedField::new(14, "content", Type::Primitive(PrimitiveType::Int), true),
NestedField::new(1, "path", Type::Primitive(PrimitiveType::String), true),
NestedField::new(2, "length", Type::Primitive(PrimitiveType::Long), true),
NestedField::new(
3,
"partition_spec_id",
Type::Primitive(PrimitiveType::Int),
true,
),
NestedField::new(
4,
"added_snapshot_id",
Type::Primitive(PrimitiveType::Long),
true,
),
NestedField::new(
5,
"added_data_files_count",
Type::Primitive(PrimitiveType::Int),
true,
),
NestedField::new(
6,
"existing_data_files_count",
Type::Primitive(PrimitiveType::Int),
true,
),
NestedField::new(
7,
"deleted_data_files_count",
Type::Primitive(PrimitiveType::Int),
true,
),
NestedField::new(
15,
"added_delete_files_count",
Type::Primitive(PrimitiveType::Int),
true,
),
NestedField::new(
16,
"existing_delete_files_count",
Type::Primitive(PrimitiveType::Int),
true,
),
NestedField::new(
17,
"deleted_delete_files_count",
Type::Primitive(PrimitiveType::Int),
true,
),
NestedField::new(
8,
"partition_summaries",
DataType::List(Arc::new(Field::new_struct(
"item",
self.partition_summary_fields(),
false,
))),
false,
Type::List(ListType {
element_field: Arc::new(NestedField::new(
9,
"item",
Type::Struct(StructType::new(vec![
Arc::new(NestedField::new(
10,
"contains_null",
Type::Primitive(PrimitiveType::Boolean),
true,
)),
Arc::new(NestedField::new(
11,
"contains_nan",
Type::Primitive(PrimitiveType::Boolean),
false,
)),
Arc::new(NestedField::new(
12,
"lower_bound",
Type::Primitive(PrimitiveType::String),
false,
)),
Arc::new(NestedField::new(
13,
"upper_bound",
Type::Primitive(PrimitiveType::String),
false,
)),
])),
true,
)),
}),
true,
),
])
];

crate::spec::Schema::builder()
.with_fields(fields.into_iter().map(|f| f.into()))
.build()
.unwrap()
}

/// Scans the manifests table.
pub async fn scan(&self) -> Result<RecordBatch> {
let mut content = PrimitiveBuilder::<Int8Type>::new();
let schema = schema_to_arrow_schema(&self.schema())?;
let partition_summary_fields = if let DataType::List(list_type) =
schema.field_with_name("partition_summaries")?.data_type()
{
if let DataType::Struct(fields) = list_type.data_type() {
fields.to_vec()
} else {
unreachable!()
}
} else {
unreachable!()
};
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very ugly here 😲


let mut content = PrimitiveBuilder::<Int32Type>::new();
let mut path = StringBuilder::new();
let mut length = PrimitiveBuilder::<Int64Type>::new();
let mut partition_spec_id = PrimitiveBuilder::<Int32Type>::new();
Expand All @@ -183,21 +264,21 @@ impl<'a> ManifestsTable<'a> {
let mut existing_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut deleted_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut partition_summaries = ListBuilder::new(StructBuilder::from_fields(
Fields::from(self.partition_summary_fields()),
Fields::from(partition_summary_fields.clone()),
0,
))
.with_field(Arc::new(Field::new_struct(
"item",
self.partition_summary_fields(),
false,
)));
.with_field(Arc::new(
Field::new_struct("item", partition_summary_fields, false).with_metadata(
HashMap::from([("PARQUET:field_id".to_string(), "9".to_string())]),
),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also ugly here 😲

));

if let Some(snapshot) = self.table.metadata().current_snapshot() {
let manifest_list = snapshot
.load_manifest_list(self.table.file_io(), &self.table.metadata_ref())
.await?;
for manifest in manifest_list.entries() {
content.append_value(manifest.content as i8);
content.append_value(manifest.content as i32);
path.append_value(manifest.manifest_path.clone());
length.append_value(manifest.manifest_length);
partition_spec_id.append_value(manifest.partition_spec_id);
Expand Down Expand Up @@ -238,7 +319,7 @@ impl<'a> ManifestsTable<'a> {
}
}

Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![
Ok(RecordBatch::try_new(Arc::new(schema), vec![
Arc::new(content.finish()),
Arc::new(path.finish()),
Arc::new(length.finish()),
Expand Down Expand Up @@ -397,20 +478,20 @@ mod tests {
check_record_batch(
record_batch,
expect![[r#"
Field { name: "content", data_type: Int8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "length", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "partition_spec_id", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "added_snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "added_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "existing_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "deleted_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "added_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "existing_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "deleted_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "partition_summaries", data_type: List(Field { name: "item", data_type: Struct([Field { name: "contains_null", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "contains_nan", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]],
Field { name: "content", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "14"} },
Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} },
Field { name: "length", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} },
Field { name: "partition_spec_id", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} },
Field { name: "added_snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "4"} },
Field { name: "added_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "5"} },
Field { name: "existing_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "6"} },
Field { name: "deleted_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "7"} },
Field { name: "added_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "15"} },
Field { name: "existing_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "16"} },
Field { name: "deleted_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "17"} },
Field { name: "partition_summaries", data_type: List(Field { name: "item", data_type: Struct([Field { name: "contains_null", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "10"} }, Field { name: "contains_nan", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "11"} }, Field { name: "lower_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "12"} }, Field { name: "upper_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "13"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "9"} }), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "8"} }"#]],
expect![[r#"
content: PrimitiveArray<Int8>
content: PrimitiveArray<Int32>
[
0,
],
Expand Down
Loading