Skip to content

Commit 697a200

Browse files
ryzhykLeonid Ryzhyk
andauthored
datafusion: Create table provider for a snapshot. (#707)
* datafusion: Create table provider for a snapshot. The Iceberg table provider allows querying an Iceberg table via datafusion. The initial implementation only allowed querying the latest snapshot of the table. It sometimes useful to query a specific snapshot (time travel). This commit adds this capability. It adds a new method (`try_new_from_table_snapshot`) that creates a provider for a specific table snapshot. All existing APIs should work as before. Signed-off-by: Leonid Ryzhyk <[email protected]> * datafusion: use Snapshot::schema, not schema_id(). Apply @liurenjie1024's suggestion: use `Snapshot::schema` instead of retrieving the schema directly by id (which can be missing in the snapshot). Signed-off-by: Leonid Ryzhyk <[email protected]> --------- Signed-off-by: Leonid Ryzhyk <[email protected]> Co-authored-by: Leonid Ryzhyk <[email protected]>
1 parent b2fb803 commit 697a200

File tree

2 files changed

+82
-6
lines changed
  • crates/integrations/datafusion/src

2 files changed

+82
-6
lines changed

crates/integrations/datafusion/src/physical_plan/scan.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ use crate::to_datafusion_error;
4343
pub(crate) struct IcebergTableScan {
4444
/// A table in the catalog.
4545
table: Table,
46+
/// Snapshot of the table to scan.
47+
snapshot_id: Option<i64>,
4648
/// A reference-counted arrow `Schema`.
4749
schema: ArrowSchemaRef,
4850
/// Stores certain, often expensive to compute,
@@ -58,6 +60,7 @@ impl IcebergTableScan {
5860
/// Creates a new [`IcebergTableScan`] object.
5961
pub(crate) fn new(
6062
table: Table,
63+
snapshot_id: Option<i64>,
6164
schema: ArrowSchemaRef,
6265
projection: Option<&Vec<usize>>,
6366
filters: &[Expr],
@@ -68,6 +71,7 @@ impl IcebergTableScan {
6871

6972
Self {
7073
table,
74+
snapshot_id,
7175
schema,
7276
plan_properties,
7377
projection,
@@ -119,6 +123,7 @@ impl ExecutionPlan for IcebergTableScan {
119123
) -> DFResult<SendableRecordBatchStream> {
120124
let fut = get_batch_stream(
121125
self.table.clone(),
126+
self.snapshot_id,
122127
self.projection.clone(),
123128
self.predicates.clone(),
124129
);
@@ -157,12 +162,18 @@ impl DisplayAs for IcebergTableScan {
157162
/// and then converts it into a stream of Arrow [`RecordBatch`]es.
158163
async fn get_batch_stream(
159164
table: Table,
165+
snapshot_id: Option<i64>,
160166
column_names: Option<Vec<String>>,
161167
predicates: Option<Predicate>,
162168
) -> DFResult<Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>> {
169+
let scan_builder = match snapshot_id {
170+
Some(snapshot_id) => table.scan().snapshot_id(snapshot_id),
171+
None => table.scan(),
172+
};
173+
163174
let mut scan_builder = match column_names {
164-
Some(column_names) => table.scan().select(column_names),
165-
None => table.scan().select_all(),
175+
Some(column_names) => scan_builder.select(column_names),
176+
None => scan_builder.select_all(),
166177
};
167178
if let Some(pred) = predicates {
168179
scan_builder = scan_builder.with_filter(pred);

crates/integrations/datafusion/src/table/mod.rs

Lines changed: 69 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
2929
use datafusion::physical_plan::ExecutionPlan;
3030
use iceberg::arrow::schema_to_arrow_schema;
3131
use iceberg::table::Table;
32-
use iceberg::{Catalog, NamespaceIdent, Result, TableIdent};
32+
use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};
3333

3434
use crate::physical_plan::scan::IcebergTableScan;
3535

@@ -39,13 +39,19 @@ use crate::physical_plan::scan::IcebergTableScan;
3939
pub struct IcebergTableProvider {
4040
/// A table in the catalog.
4141
table: Table,
42+
/// Table snapshot id that will be queried via this provider.
43+
snapshot_id: Option<i64>,
4244
/// A reference-counted arrow `Schema`.
4345
schema: ArrowSchemaRef,
4446
}
4547

4648
impl IcebergTableProvider {
4749
pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self {
48-
IcebergTableProvider { table, schema }
50+
IcebergTableProvider {
51+
table,
52+
snapshot_id: None,
53+
schema,
54+
}
4955
}
5056
/// Asynchronously tries to construct a new [`IcebergTableProvider`]
5157
/// using the given client and table name to fetch an actual [`Table`]
@@ -60,14 +66,46 @@ impl IcebergTableProvider {
6066

6167
let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
6268

63-
Ok(IcebergTableProvider { table, schema })
69+
Ok(IcebergTableProvider {
70+
table,
71+
snapshot_id: None,
72+
schema,
73+
})
6474
}
6575

6676
/// Asynchronously tries to construct a new [`IcebergTableProvider`]
6777
/// using the given table. Can be used to create a table provider from an existing table regardless of the catalog implementation.
6878
pub async fn try_new_from_table(table: Table) -> Result<Self> {
6979
let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
70-
Ok(IcebergTableProvider { table, schema })
80+
Ok(IcebergTableProvider {
81+
table,
82+
snapshot_id: None,
83+
schema,
84+
})
85+
}
86+
87+
/// Asynchronously tries to construct a new [`IcebergTableProvider`]
88+
/// using a specific snapshot of the given table. Can be used to create a table provider from an existing table regardless of the catalog implementation.
89+
pub async fn try_new_from_table_snapshot(table: Table, snapshot_id: i64) -> Result<Self> {
90+
let snapshot = table
91+
.metadata()
92+
.snapshot_by_id(snapshot_id)
93+
.ok_or_else(|| {
94+
Error::new(
95+
ErrorKind::Unexpected,
96+
format!(
97+
"snapshot id {snapshot_id} not found in table {}",
98+
table.identifier().name()
99+
),
100+
)
101+
})?;
102+
let schema = snapshot.schema(table.metadata())?;
103+
let schema = Arc::new(schema_to_arrow_schema(&schema)?);
104+
Ok(IcebergTableProvider {
105+
table,
106+
snapshot_id: Some(snapshot_id),
107+
schema,
108+
})
71109
}
72110
}
73111

@@ -94,6 +132,7 @@ impl TableProvider for IcebergTableProvider {
94132
) -> DFResult<Arc<dyn ExecutionPlan>> {
95133
Ok(Arc::new(IcebergTableScan::new(
96134
self.table.clone(),
135+
self.snapshot_id,
97136
self.schema.clone(),
98137
projection,
99138
filters,
@@ -162,4 +201,30 @@ mod tests {
162201
let has_column = df_schema.has_column(&Column::from_name("z"));
163202
assert!(has_column);
164203
}
204+
205+
#[tokio::test]
206+
async fn test_try_new_from_table_snapshot() {
207+
let table = get_test_table_from_metadata_file().await;
208+
let snapshot_id = table.metadata().snapshots().next().unwrap().snapshot_id();
209+
let table_provider =
210+
IcebergTableProvider::try_new_from_table_snapshot(table.clone(), snapshot_id)
211+
.await
212+
.unwrap();
213+
let ctx = SessionContext::new();
214+
ctx.register_table("mytable", Arc::new(table_provider))
215+
.unwrap();
216+
let df = ctx.sql("SELECT * FROM mytable").await.unwrap();
217+
let df_schema = df.schema();
218+
let df_columns = df_schema.fields();
219+
assert_eq!(df_columns.len(), 3);
220+
let x_column = df_columns.first().unwrap();
221+
let column_data = format!(
222+
"{:?}:{:?}",
223+
x_column.name(),
224+
x_column.data_type().to_string()
225+
);
226+
assert_eq!(column_data, "\"x\":\"Int64\"");
227+
let has_column = df_schema.has_column(&Column::from_name("z"));
228+
assert!(has_column);
229+
}
165230
}

0 commit comments

Comments
 (0)