Skip to content
Merged
4 changes: 3 additions & 1 deletion crates/iceberg/src/spec/manifest/data_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use serde_with::{DeserializeFromStr, SerializeDisplay};
use super::_serde::DataFileSerde;
use super::{Datum, FormatVersion, Schema, data_file_schema_v1, data_file_schema_v2};
use crate::error::Result;
use crate::spec::{Struct, StructType};
use crate::spec::{DEFAULT_PARTITION_SPEC_ID, Struct, StructType};
use crate::{Error, ErrorKind};

/// Data file carries data file path, partition tuple, metrics, …
Expand All @@ -49,6 +49,7 @@ pub struct DataFile {
///
/// Partition data tuple, schema based on the partition spec output using
/// partition field ids for the struct field ids
#[builder(default = "Struct::empty()")]
pub(crate) partition: Struct,
/// field id: 103
///
Expand Down Expand Up @@ -156,6 +157,7 @@ pub struct DataFile {
pub(crate) first_row_id: Option<i64>,
/// This field is not included in spec. It is just store in memory representation used
/// in process.
#[builder(default = "DEFAULT_PARTITION_SPEC_ID")]
pub(crate) partition_spec_id: i32,
/// field id: 143
///
Expand Down
24 changes: 24 additions & 0 deletions crates/iceberg/src/spec/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,15 @@ impl PartitionKey {
Self { spec, schema, data }
}

/// Creates a new partition key from another partition key, with a new data field.
pub fn copy_with_data(&self, data: Struct) -> Self {
Self {
spec: self.spec.clone(),
schema: self.schema.clone(),
data,
}
}

/// Generates a partition path based on the partition values.
pub fn to_path(&self) -> String {
self.spec.partition_to_path(&self.data, self.schema.clone())
Expand All @@ -207,6 +216,21 @@ impl PartitionKey {
Some(pk) => pk.spec.is_unpartitioned(),
}
}

/// Returns the associated [`PartitionSpec`].
pub fn spec(&self) -> &PartitionSpec {
&self.spec
}

/// Returns the associated [`SchemaRef`].
pub fn schema(&self) -> &SchemaRef {
&self.schema
}

/// Returns the associated [`Struct`].
pub fn data(&self) -> &Struct {
&self.data
}
}

/// Reference to [`UnboundPartitionSpec`].
Expand Down
153 changes: 101 additions & 52 deletions crates/iceberg/src/writer/base_writer/data_file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,86 +18,126 @@
//! This module provide `DataFileWriter`.

use arrow_array::RecordBatch;
use itertools::Itertools;

use crate::Result;
use crate::spec::{DataContentType, DataFile, Struct};
use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
use crate::spec::{DataContentType, DataFile, PartitionKey};
use crate::writer::file_writer::FileWriterBuilder;
use crate::writer::file_writer::location_generator::{FileNameGenerator, LocationGenerator};
use crate::writer::file_writer::rolling_writer::{RollingFileWriter, RollingFileWriterBuilder};
use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder};
use crate::{Error, ErrorKind, Result};

/// Builder for `DataFileWriter`.
#[derive(Clone, Debug)]
pub struct DataFileWriterBuilder<B: FileWriterBuilder> {
inner: B,
partition_value: Option<Struct>,
partition_spec_id: i32,
pub struct DataFileWriterBuilder<B: FileWriterBuilder, L: LocationGenerator, F: FileNameGenerator> {
inner: RollingFileWriterBuilder<B, L, F>,
partition_key: Option<PartitionKey>,
}

impl<B: FileWriterBuilder> DataFileWriterBuilder<B> {
/// Create a new `DataFileWriterBuilder` using a `FileWriterBuilder`.
pub fn new(inner: B, partition_value: Option<Struct>, partition_spec_id: i32) -> Self {
impl<B, L, F> DataFileWriterBuilder<B, L, F>
where
B: FileWriterBuilder,
L: LocationGenerator,
F: FileNameGenerator,
{
/// Create a new `DataFileWriterBuilder` using a `RollingFileWriterBuilder`.
pub fn new(
inner_builder: RollingFileWriterBuilder<B, L, F>,
partition_key: Option<PartitionKey>,
) -> Self {
Self {
inner,
partition_value,
partition_spec_id,
inner: inner_builder,
partition_key,
}
}
}

#[async_trait::async_trait]
impl<B: FileWriterBuilder> IcebergWriterBuilder for DataFileWriterBuilder<B> {
type R = DataFileWriter<B>;
impl<B, L, F> IcebergWriterBuilder for DataFileWriterBuilder<B, L, F>
where
B: FileWriterBuilder,
L: LocationGenerator,
F: FileNameGenerator,
{
type R = DataFileWriter<B, L, F>;

async fn build(self) -> Result<Self::R> {
Ok(DataFileWriter {
inner_writer: Some(self.inner.clone().build().await?),
partition_value: self.partition_value.unwrap_or(Struct::empty()),
partition_spec_id: self.partition_spec_id,
inner: Some(self.inner.clone().build()),
partition_key: self.partition_key,
})
}
}

/// A writer write data is within one spec/partition.
#[derive(Debug)]
pub struct DataFileWriter<B: FileWriterBuilder> {
inner_writer: Option<B::R>,
partition_value: Struct,
partition_spec_id: i32,
pub struct DataFileWriter<B: FileWriterBuilder, L: LocationGenerator, F: FileNameGenerator> {
inner: Option<RollingFileWriter<B, L, F>>,
partition_key: Option<PartitionKey>,
}

#[async_trait::async_trait]
impl<B: FileWriterBuilder> IcebergWriter for DataFileWriter<B> {
impl<B, L, F> IcebergWriter for DataFileWriter<B, L, F>
where
B: FileWriterBuilder,
L: LocationGenerator,
F: FileNameGenerator,
{
async fn write(&mut self, batch: RecordBatch) -> Result<()> {
self.inner_writer.as_mut().unwrap().write(&batch).await
if let Some(writer) = self.inner.as_mut() {
writer.write(&self.partition_key, &batch).await
} else {
Err(Error::new(
ErrorKind::Unexpected,
"Writer is not initialized!",
))
}
}

async fn close(&mut self) -> Result<Vec<DataFile>> {
let writer = self.inner_writer.take().unwrap();
Ok(writer
.close()
.await?
.into_iter()
.map(|mut res| {
res.content(DataContentType::Data);
res.partition(self.partition_value.clone());
res.partition_spec_id(self.partition_spec_id);
res.build().expect("Guaranteed to be valid")
})
.collect_vec())
if let Some(writer) = self.inner.take() {
writer
.close()
.await?
.into_iter()
.map(|mut res| {
res.content(DataContentType::Data);
if let Some(pk) = self.partition_key.as_ref() {
res.partition(pk.data().clone());
res.partition_spec_id(pk.spec().spec_id());
}
res.build().map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
format!("Failed to build data file: {}", e),
)
})
})
.collect()
} else {
Err(Error::new(
ErrorKind::Unexpected,
"Data file writer has been closed.",
))
}
}
}

impl<B: FileWriterBuilder> CurrentFileStatus for DataFileWriter<B> {
impl<B, L, F> CurrentFileStatus for DataFileWriter<B, L, F>
where
B: FileWriterBuilder,
L: LocationGenerator,
F: FileNameGenerator,
{
fn current_file_path(&self) -> String {
self.inner_writer.as_ref().unwrap().current_file_path()
self.inner.as_ref().unwrap().current_file_path()
}

fn current_row_num(&self) -> usize {
self.inner_writer.as_ref().unwrap().current_row_num()
self.inner.as_ref().unwrap().current_row_num()
}

fn current_written_size(&self) -> usize {
self.inner_writer.as_ref().unwrap().current_written_size()
self.inner.as_ref().unwrap().current_written_size()
}
}

Expand All @@ -116,13 +156,15 @@ mod test {
use crate::Result;
use crate::io::FileIOBuilder;
use crate::spec::{
DataContentType, DataFileFormat, Literal, NestedField, PrimitiveType, Schema, Struct, Type,
DataContentType, DataFileFormat, Literal, NestedField, PartitionKey, PartitionSpec,
PrimitiveType, Schema, Struct, Type,
};
use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder;
use crate::writer::file_writer::ParquetWriterBuilder;
use crate::writer::file_writer::location_generator::{
DefaultFileNameGenerator, DefaultLocationGenerator,
};
use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
use crate::writer::{IcebergWriter, IcebergWriterBuilder, RecordBatch};

#[tokio::test]
Expand All @@ -143,16 +185,16 @@ mod test {
])
.build()?;

let pw = ParquetWriterBuilder::new(
WriterProperties::builder().build(),
Arc::new(schema),
None,
let pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema));

let rolling_file_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
pw,
file_io.clone(),
location_gen,
file_name_gen,
);

let mut data_file_writer = DataFileWriterBuilder::new(pw, None, 0)
let mut data_file_writer = DataFileWriterBuilder::new(rolling_file_writer_builder, None)
.build()
.await
.unwrap();
Expand Down Expand Up @@ -219,20 +261,27 @@ mod test {
NestedField::required(6, "name", Type::Primitive(PrimitiveType::String)).into(),
])
.build()?;
let schema_ref = Arc::new(schema);

let partition_value = Struct::from_iter([Some(Literal::int(1))]);
let partition_key = PartitionKey::new(
PartitionSpec::builder(schema_ref.clone()).build()?,
schema_ref.clone(),
partition_value.clone(),
);

let parquet_writer_builder = ParquetWriterBuilder::new(
WriterProperties::builder().build(),
Arc::new(schema.clone()),
None,
let parquet_writer_builder =
ParquetWriterBuilder::new(WriterProperties::builder().build(), schema_ref.clone());

let rolling_file_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
parquet_writer_builder,
file_io.clone(),
location_gen,
file_name_gen,
);

let mut data_file_writer =
DataFileWriterBuilder::new(parquet_writer_builder, Some(partition_value.clone()), 0)
DataFileWriterBuilder::new(rolling_file_writer_builder, Some(partition_key))
.build()
.await?;

Expand Down
Loading
Loading