-
Notifications
You must be signed in to change notification settings - Fork 392
Description
Feature Request / Improvement
Let's investigate the level of abstraction on the write path.
Currently, we are doing schema-compatible checks, schema coercion, bin-packing, transformation, etc at different levels of the stack. It'll be good to optimize and see which functions can be pushed up the stack.
For example, here's what the overwrite path looks like
overwrite
_dataframe_to_data_files
write_file
write_parquet
(copied over from #910 (review))
Another example #786 (comment)
More info
overwrite checks schema compatibility
iceberg-python/pyiceberg/table/__init__.py
Lines 541 to 550 in 3f44dfe
| _check_schema_compatible( | |
| self._table.schema(), other_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us | |
| ) | |
| self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties) | |
| with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot: | |
| # skip writing data files if the dataframe is empty | |
| if df.shape[0] > 0: | |
| data_files = _dataframe_to_data_files( |
_dataframe_to_data_files bin-packs the pyarrow Table
iceberg-python/pyiceberg/io/pyarrow.py
Lines 2222 to 2225 in 3f44dfe
| tasks=iter([ | |
| WriteTask(write_uuid=write_uuid, task_id=next(counter), record_batches=batches, schema=table_metadata.schema()) | |
| for batches in bin_pack_arrow_table(df, target_file_size) | |
| ]), |
write_parquet transforms table schema
iceberg-python/pyiceberg/io/pyarrow.py
Lines 2001 to 2008 in 3f44dfe
| table_schema = task.schema | |
| # if schema needs to be transformed, use the transformed schema and adjust the arrow table accordingly | |
| # otherwise use the original schema | |
| if (sanitized_schema := sanitize_column_names(table_schema)) != table_schema: | |
| file_schema = sanitized_schema | |
| else: | |
| file_schema = table_schema |
and
iceberg-python/pyiceberg/io/pyarrow.py
Lines 2011 to 2021 in 3f44dfe
| batches = [ | |
| _to_requested_schema( | |
| requested_schema=file_schema, | |
| file_schema=table_schema, | |
| batch=batch, | |
| downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, | |
| include_field_ids=True, | |
| ) | |
| for batch in task.record_batches | |
| ] | |
| arrow_table = pa.Table.from_batches(batches) |