Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ use iceberg::spec::{TableMetadata, TableMetadataRef, Transform};
/// NonZeroUsize::new(4).unwrap(),
/// )?;
/// ```
#[allow(dead_code)]
pub(crate) fn repartition(
input: Arc<dyn ExecutionPlan>,
table_metadata: TableMetadataRef,
Expand Down
47 changes: 22 additions & 25 deletions crates/integrations/datafusion/src/physical_plan/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use datafusion::physical_plan::{
execute_input_stream,
};
use futures::StreamExt;
use iceberg::arrow::{FieldMatchMode, schema_to_arrow_schema};
use iceberg::arrow::FieldMatchMode;
use iceberg::spec::{DataFileFormat, TableProperties, serialize_data_file_to_json};
use iceberg::table::Table;
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
Expand All @@ -44,12 +44,12 @@ use iceberg::writer::file_writer::location_generator::{
DefaultFileNameGenerator, DefaultLocationGenerator,
};
use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
use iceberg::{Error, ErrorKind};
use parquet::file::properties::WriterProperties;
use uuid::Uuid;

use crate::physical_plan::DATA_FILES_COL_NAME;
use crate::task_writer::TaskWriter;
use crate::to_datafusion_error;

/// An execution plan node that writes data to an Iceberg table.
Expand Down Expand Up @@ -205,18 +205,6 @@ impl ExecutionPlan for IcebergWriteExec {
partition: usize,
context: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
if !self
.table
.metadata()
.default_partition_spec()
.is_unpartitioned()
{
// TODO add support for partitioned tables
return Err(DataFusionError::NotImplemented(
"IcebergWriteExec does not support partitioned tables yet".to_string(),
));
}

let partition_type = self.table.metadata().default_partition_type().clone();
let format_version = self.table.metadata().format_version();

Expand Down Expand Up @@ -277,31 +265,40 @@ impl ExecutionPlan for IcebergWriteExec {
);
let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder);

// Create TaskWriter
// TODO: Make fanout_enabled configurable via table properties
let fanout_enabled = true;
let schema = self.table.metadata().current_schema().clone();
let partition_spec = self.table.metadata().default_partition_spec().clone();
let task_writer = TaskWriter::new(
data_file_writer_builder,
fanout_enabled,
schema.clone(),
partition_spec,
);

// Get input data
let data = execute_input_stream(
Arc::clone(&self.input),
Arc::new(
schema_to_arrow_schema(self.table.metadata().current_schema())
.map_err(to_datafusion_error)?,
),
self.input.schema(), // input schema may have projected column `_partition`
partition,
Arc::clone(&context),
)?;

// Create write stream
let stream = futures::stream::once(async move {
let mut writer = data_file_writer_builder
// todo specify partition key when partitioning writer is supported
.build(None)
.await
.map_err(to_datafusion_error)?;
let mut task_writer = task_writer;
let mut input_stream = data;

while let Some(batch) = input_stream.next().await {
writer.write(batch?).await.map_err(to_datafusion_error)?;
let batch = batch?;
task_writer
.write(batch)
.await
.map_err(to_datafusion_error)?;
}

let data_files = writer.close().await.map_err(to_datafusion_error)?;
let data_files = task_writer.close().await.map_err(to_datafusion_error)?;

// Convert builders to data files and then to JSON strings
let data_files_strs: Vec<String> = data_files
Expand Down
43 changes: 28 additions & 15 deletions crates/integrations/datafusion/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod metadata_table;
pub mod table_provider_factory;

use std::any::Any;
use std::num::NonZeroUsize;
use std::sync::Arc;

use async_trait::async_trait;
Expand All @@ -38,6 +39,8 @@ use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};
use metadata_table::IcebergMetadataTableProvider;

use crate::physical_plan::commit::IcebergCommitExec;
use crate::physical_plan::project::project_with_partition;
use crate::physical_plan::repartition::repartition;
use crate::physical_plan::scan::IcebergTableScan;
use crate::physical_plan::write::IcebergWriteExec;

Expand Down Expand Up @@ -170,32 +173,42 @@ impl TableProvider for IcebergTableProvider {

async fn insert_into(
&self,
_state: &dyn Session,
state: &dyn Session,
input: Arc<dyn ExecutionPlan>,
_insert_op: InsertOp,
) -> DFResult<Arc<dyn ExecutionPlan>> {
if !self
.table
.metadata()
.default_partition_spec()
.is_unpartitioned()
{
// TODO add insert into support for partitioned tables
return Err(DataFusionError::NotImplemented(
"IcebergTableProvider::insert_into does not support partitioned tables yet"
.to_string(),
));
}

let Some(catalog) = self.catalog.clone() else {
return Err(DataFusionError::Execution(
"Catalog cannot be none for insert_into".to_string(),
));
};

let partition_spec = self.table.metadata().default_partition_spec();

// Step 1: Project partition values for partitioned tables
let plan_with_partition = if !partition_spec.is_unpartitioned() {
project_with_partition(input, &self.table)?
} else {
input
};

// Step 2: Repartition for parallel processing
let target_partitions =
NonZeroUsize::new(state.config().target_partitions()).ok_or_else(|| {
DataFusionError::Configuration(
"target_partitions must be greater than 0".to_string(),
)
})?;

let repartitioned_plan = repartition(
plan_with_partition,
self.table.metadata_ref(),
target_partitions,
)?;

let write_plan = Arc::new(IcebergWriteExec::new(
self.table.clone(),
input,
repartitioned_plan,
self.schema.clone(),
));

Expand Down
58 changes: 21 additions & 37 deletions crates/integrations/datafusion/src/task_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use iceberg::writer::partitioning::unpartitioned_writer::UnpartitionedWriter;
///
/// TaskWriter coordinates writing data to Iceberg tables by:
/// - Selecting the appropriate partitioning strategy (unpartitioned, fanout, or clustered)
/// - Lazily initializing the partition splitter on first write
/// - Initializing the partition splitter in the constructor for partitioned tables
/// - Routing data to the underlying writer
/// - Collecting all written data files
///
Expand Down Expand Up @@ -63,23 +63,17 @@ use iceberg::writer::partitioning::unpartitioned_writer::UnpartitionedWriter;
/// // Close and get data files
/// let data_files = task_writer.close().await?;
/// ```
#[allow(dead_code)]
pub(crate) struct TaskWriter<B: IcebergWriterBuilder> {
/// The underlying writer (UnpartitionedWriter, FanoutWriter, or ClusteredWriter)
writer: SupportedWriter<B>,
/// Lazily initialized partition splitter for partitioned tables
/// Partition splitter for partitioned tables (initialized in constructor)
partition_splitter: Option<RecordBatchPartitionSplitter>,
/// Iceberg schema reference
schema: SchemaRef,
/// Partition specification reference
partition_spec: PartitionSpecRef,
}

/// Internal enum to hold the different writer types.
///
/// This enum allows TaskWriter to work with different partitioning strategies
/// while maintaining a unified interface.
#[allow(dead_code)]
enum SupportedWriter<B: IcebergWriterBuilder> {
/// Writer for unpartitioned tables
Unpartitioned(UnpartitionedWriter<B>),
Expand All @@ -89,7 +83,6 @@ enum SupportedWriter<B: IcebergWriterBuilder> {
Clustered(ClusteredWriter<B>),
}

#[allow(dead_code)]
impl<B: IcebergWriterBuilder> TaskWriter<B> {
/// Create a new TaskWriter.
///
Expand Down Expand Up @@ -139,17 +132,29 @@ impl<B: IcebergWriterBuilder> TaskWriter<B> {
SupportedWriter::Clustered(ClusteredWriter::new(writer_builder))
};

// Initialize partition splitter in constructor for partitioned tables
let partition_splitter = if !partition_spec.is_unpartitioned() {
Some(
RecordBatchPartitionSplitter::new_with_precomputed_values(
schema.clone(),
partition_spec.clone(),
)
.expect("Failed to create partition splitter"),
)
} else {
None
};

Self {
writer,
partition_splitter: None,
schema,
partition_spec,
partition_splitter,
}
}

/// Write a RecordBatch to the TaskWriter.
///
/// For the first write to a partitioned table, this method initializes the partition splitter.
/// For partitioned tables, uses the partition splitter to split
/// the batch by partition key and route each partition to the underlying writer.
/// For unpartitioned tables, data is written directly without splitting.
///
/// # Parameters
Expand All @@ -163,7 +168,6 @@ impl<B: IcebergWriterBuilder> TaskWriter<B> {
/// # Errors
///
/// This method will return an error if:
/// - Partition splitter initialization fails
/// - Splitting the batch by partition fails
/// - Writing to the underlying writer fails
///
Expand All @@ -183,29 +187,9 @@ impl<B: IcebergWriterBuilder> TaskWriter<B> {
writer.write(batch).await
}
SupportedWriter::Fanout(writer) => {
// Initialize splitter on first write if needed
if self.partition_splitter.is_none() {
self.partition_splitter =
Some(RecordBatchPartitionSplitter::new_with_precomputed_values(
self.schema.clone(),
self.partition_spec.clone(),
)?);
}

// Split and write partitioned data
Self::write_partitioned_batches(writer, &self.partition_splitter, &batch).await
}
SupportedWriter::Clustered(writer) => {
// Initialize splitter on first write if needed
if self.partition_splitter.is_none() {
self.partition_splitter =
Some(RecordBatchPartitionSplitter::new_with_precomputed_values(
self.schema.clone(),
self.partition_spec.clone(),
)?);
}

// Split and write partitioned data
Self::write_partitioned_batches(writer, &self.partition_splitter, &batch).await
}
}
Expand All @@ -214,13 +198,13 @@ impl<B: IcebergWriterBuilder> TaskWriter<B> {
/// Helper method to split and write partitioned data.
///
/// This method handles the common logic for both FanoutWriter and ClusteredWriter:
/// - Splits the batch by partition key using the provided splitter
/// - Writes each partition to the underlying writer
/// - Splits the batch by partition key using the partition splitter
/// - Writes each partition to the underlying writer with its corresponding partition key
///
/// # Parameters
///
/// * `writer` - The underlying PartitioningWriter (FanoutWriter or ClusteredWriter)
/// * `partition_splitter` - The partition splitter (must be initialized)
/// * `partition_splitter` - The partition splitter
/// * `batch` - The RecordBatch to write
///
/// # Returns
Expand Down
Loading
Loading