-
Notifications
You must be signed in to change notification settings - Fork 351
Description
I've been working on creating a performance testing suite to measure the performance impact of the concurrent table scan work that I've been doing. I created a docker-compose file that uses the Tabular spark-iceberg container and minio to create an Iceberg table and insert data into it from the widely-used NYC Taxi dataset. This is the spark SQL that I used to create the table:
CREATE DATABASE IF NOT EXISTS nyc.taxis;
DROP TABLE IF EXISTS nyc.taxis;
CREATE TABLE nyc.taxis (
VendorID bigint,
tpep_pickup_datetime timestamp,
tpep_dropoff_datetime timestamp,
passenger_count double,
trip_distance double,
RatecodeID double,
store_and_fwd_flag string,
PULocationID bigint,
DOLocationID bigint,
payment_type bigint,
fare_amount double,
extra double,
mta_tax double,
tip_amount double,
tolls_amount double,
improvement_surcharge double,
total_amount double,
congestion_surcharge double,
airport_fee double
)
USING iceberg
PARTITIONED BY (days(tpep_pickup_datetime));Note the partition on a timestamp column with a transform of Day.
When it inserts data, the reference Java Iceberg implementation writes the Avro manifest files, using an Avro type of Date for the partition struct value.
Iceberg-rust's PartitionSpec.partition_type() method calls partition_field.transform.result_type in order to determine the type of fields when constructing a StructType for the table's partition schema:
| let res_type = partition_field.transform.result_type(&field.field_type)?; |
Transform's result_type method, for Transform::Day, maps this PrimitiveType::Date to PrimitiveType::Int:
iceberg-rust/crates/iceberg/src/spec/transform.rs
Lines 197 to 208 in 244a218
| Transform::Year | Transform::Month | Transform::Day => { | |
| if let Type::Primitive(p) = input_type { | |
| match p { | |
| PrimitiveType::Timestamp | |
| | PrimitiveType::Timestamptz | |
| | PrimitiveType::Date => Ok(Type::Primitive(PrimitiveType::Int)), | |
| _ => Err(Error::new( | |
| ErrorKind::DataInvalid, | |
| format!("{input_type} is not a valid input type of {self} transform",), | |
| )), | |
| } | |
| } else { |
This is inconsistent with the Java implementation, and I think this should map to PrimitiveType::Date.
As a consequence, when a file plan in iceberg-rust tries to parse a manifest file written by the Java implementation with a Transform::Day partition, manifest_schema_v2() in manifest.rs calls schema_to_avro_schema() which visits the partition schema with SchemaToAvroSchema. This understandably transforms the PrimitiveType::Int into AvroSchema::Int:
iceberg-rust/crates/iceberg/src/avro/schema.rs
Lines 193 to 199 in 244a218
| fn primitive(&mut self, p: &PrimitiveType) -> Result<AvroSchemaOrField> { | |
| let avro_schema = match p { | |
| PrimitiveType::Boolean => AvroSchema::Boolean, | |
| PrimitiveType::Int => AvroSchema::Int, | |
| PrimitiveType::Long => AvroSchema::Long, | |
| PrimitiveType::Float => AvroSchema::Float, | |
| PrimitiveType::Double => AvroSchema::Double, |
This inconsistency between the schema passed to apache_avro's parser and the schema of the manifest file itself causes apache_avro to fail to parse the manifest.
I Propose changing transform.rs line 202 to map to PrimitiveType::Date as this fixes the problem.
| | PrimitiveType::Date => Ok(Type::Primitive(PrimitiveType::Int)), |
This necessitates changing the following tests as they fail with the proposed change:
- spec::partition::tests::test_partition_type
- transform::temporal::test::test_day_transform
- transform::temporal::test::test_month_transform
- transform::temporal::test::test_year_transform
I'll create a PR containing the fix.