From 160ca673cb53adca5e2012e46bc81be0fc79a6f1 Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Mon, 5 May 2025 21:09:46 +0900 Subject: [PATCH 1/7] Allow resolving the current snapshot ID to use on a scan from a callback function --- .../integrations/datafusion/src/table/mod.rs | 57 ++++++++++++++++++- 1 file changed, 55 insertions(+), 2 deletions(-) diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 00c9e13229..6c6ce9818c 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -24,9 +24,10 @@ use async_trait::async_trait; use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; use datafusion::catalog::Session; use datafusion::datasource::{TableProvider, TableType}; -use datafusion::error::Result as DFResult; +use datafusion::error::{DataFusionError, Result as DFResult}; use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; use datafusion::physical_plan::ExecutionPlan; +use futures::future::BoxFuture; use iceberg::arrow::schema_to_arrow_schema; use iceberg::table::Table; use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent}; @@ -35,7 +36,7 @@ use crate::physical_plan::scan::IcebergTableScan; /// Represents a [`TableProvider`] for the Iceberg [`Catalog`], /// managing access to a [`Table`]. -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct IcebergTableProvider { /// A table in the catalog. table: Table, @@ -43,6 +44,18 @@ pub struct IcebergTableProvider { snapshot_id: Option, /// A reference-counted arrow `Schema`. schema: ArrowSchemaRef, + /// A function that returns a future of the snapshot id to query. + snapshot_id_fn: Option BoxFuture<'static, Result> + Send + Sync>>, +} + +impl std::fmt::Debug for IcebergTableProvider { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("IcebergTableProvider") + .field("table", &self.table) + .field("snapshot_id", &self.snapshot_id) + .field("schema", &self.schema) + .finish_non_exhaustive() + } } impl IcebergTableProvider { @@ -51,6 +64,7 @@ impl IcebergTableProvider { table, snapshot_id: None, schema, + snapshot_id_fn: None, } } /// Asynchronously tries to construct a new [`IcebergTableProvider`] @@ -69,6 +83,7 @@ impl IcebergTableProvider { Ok(IcebergTableProvider { table, snapshot_id: None, + snapshot_id_fn: None, schema, }) } @@ -80,6 +95,7 @@ impl IcebergTableProvider { Ok(IcebergTableProvider { table, snapshot_id: None, + snapshot_id_fn: None, schema, }) } @@ -104,6 +120,36 @@ impl IcebergTableProvider { Ok(IcebergTableProvider { table, snapshot_id: Some(snapshot_id), + snapshot_id_fn: None, + schema, + }) + } + + /// Asynchronously tries to construct a new [`IcebergTableProvider`] + /// using a specific snapshot of the given table. Can be used to create a table provider from an existing table regardless of the catalog implementation. + pub async fn try_new_from_table_snapshot_fn( + table: Table, + snapshot_fn: Arc BoxFuture<'static, Result> + Send + Sync>, + ) -> Result { + let snapshot_id = snapshot_fn().await?; + let snapshot = table + .metadata() + .snapshot_by_id(snapshot_id) + .ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + format!( + "snapshot id {snapshot_id} not found in table {}", + table.identifier().name() + ), + ) + })?; + let schema = snapshot.schema(table.metadata())?; + let schema = Arc::new(schema_to_arrow_schema(&schema)?); + Ok(IcebergTableProvider { + table, + snapshot_id: None, + snapshot_id_fn: Some(snapshot_fn), schema, }) } @@ -130,6 +176,13 @@ impl TableProvider for IcebergTableProvider { filters: &[Expr], _limit: Option, ) -> DFResult> { + let snapshot_id = if let Some(snapshot_id_fn) = &self.snapshot_id_fn { + Some(snapshot_id_fn().await.map_err(|e| { + DataFusionError::Execution(format!("Error getting Iceberg snapshot id: {e}")) + })?) + } else { + self.snapshot_id + }; Ok(Arc::new(IcebergTableScan::new( self.table.clone(), self.snapshot_id, From 990ebd6c3316821cd7bf7ca42041e2f851c83767 Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Mon, 5 May 2025 22:13:01 +0900 Subject: [PATCH 2/7] Use table_fn --- .../integrations/datafusion/src/table/mod.rs | 46 ++++++------------- 1 file changed, 15 insertions(+), 31 deletions(-) diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 6c6ce9818c..86be1d835c 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -44,8 +44,8 @@ pub struct IcebergTableProvider { snapshot_id: Option, /// A reference-counted arrow `Schema`. schema: ArrowSchemaRef, - /// A function that returns a future of the snapshot id to query. - snapshot_id_fn: Option BoxFuture<'static, Result> + Send + Sync>>, + /// A function that returns a future of the table metadata to query. + table_fn: Option BoxFuture<'static, Result> + Send + Sync>>, } impl std::fmt::Debug for IcebergTableProvider { @@ -64,7 +64,7 @@ impl IcebergTableProvider { table, snapshot_id: None, schema, - snapshot_id_fn: None, + table_fn: None, } } /// Asynchronously tries to construct a new [`IcebergTableProvider`] @@ -83,7 +83,7 @@ impl IcebergTableProvider { Ok(IcebergTableProvider { table, snapshot_id: None, - snapshot_id_fn: None, + table_fn: None, schema, }) } @@ -95,7 +95,7 @@ impl IcebergTableProvider { Ok(IcebergTableProvider { table, snapshot_id: None, - snapshot_id_fn: None, + table_fn: None, schema, }) } @@ -120,36 +120,22 @@ impl IcebergTableProvider { Ok(IcebergTableProvider { table, snapshot_id: Some(snapshot_id), - snapshot_id_fn: None, + table_fn: None, schema, }) } /// Asynchronously tries to construct a new [`IcebergTableProvider`] /// using a specific snapshot of the given table. Can be used to create a table provider from an existing table regardless of the catalog implementation. - pub async fn try_new_from_table_snapshot_fn( - table: Table, - snapshot_fn: Arc BoxFuture<'static, Result> + Send + Sync>, + pub async fn try_new_from_table_fn( + table_fn: Arc BoxFuture<'static, Result
> + Send + Sync>, ) -> Result { - let snapshot_id = snapshot_fn().await?; - let snapshot = table - .metadata() - .snapshot_by_id(snapshot_id) - .ok_or_else(|| { - Error::new( - ErrorKind::Unexpected, - format!( - "snapshot id {snapshot_id} not found in table {}", - table.identifier().name() - ), - ) - })?; - let schema = snapshot.schema(table.metadata())?; - let schema = Arc::new(schema_to_arrow_schema(&schema)?); + let table = table_fn().await?; + let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); Ok(IcebergTableProvider { table, snapshot_id: None, - snapshot_id_fn: Some(snapshot_fn), + table_fn: Some(table_fn), schema, }) } @@ -176,15 +162,13 @@ impl TableProvider for IcebergTableProvider { filters: &[Expr], _limit: Option, ) -> DFResult> { - let snapshot_id = if let Some(snapshot_id_fn) = &self.snapshot_id_fn { - Some(snapshot_id_fn().await.map_err(|e| { - DataFusionError::Execution(format!("Error getting Iceberg snapshot id: {e}")) - })?) + let table = if let Some(table_fn) = &self.table_fn { + table_fn().await? } else { - self.snapshot_id + self.table.clone() }; Ok(Arc::new(IcebergTableScan::new( - self.table.clone(), + table, self.snapshot_id, self.schema.clone(), projection, From c72c0bee85406fda71a6916e30fea76a99725eab Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Mon, 5 May 2025 22:17:17 +0900 Subject: [PATCH 3/7] Fix --- crates/integrations/datafusion/src/table/mod.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 86be1d835c..6c9cfbc0f0 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -163,7 +163,9 @@ impl TableProvider for IcebergTableProvider { _limit: Option, ) -> DFResult> { let table = if let Some(table_fn) = &self.table_fn { - table_fn().await? + table_fn().await.map_err(|e| { + DataFusionError::Execution(format!("Error getting Iceberg table metadata: {e}")) + })? } else { self.table.clone() }; From df84ed00f36be9ddaed3facdf101ec119a7adec6 Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Mon, 5 May 2025 22:30:49 +0900 Subject: [PATCH 4/7] Just pass a reference to the catalog --- .../integrations/datafusion/src/table/mod.rs | 38 +++++++------------ 1 file changed, 13 insertions(+), 25 deletions(-) diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 6c9cfbc0f0..08a775bb85 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -44,8 +44,8 @@ pub struct IcebergTableProvider { snapshot_id: Option, /// A reference-counted arrow `Schema`. schema: ArrowSchemaRef, - /// A function that returns a future of the table metadata to query. - table_fn: Option BoxFuture<'static, Result
> + Send + Sync>>, + /// A reference to the catalog that this table provider belongs to. + catalog: Option>, } impl std::fmt::Debug for IcebergTableProvider { @@ -64,7 +64,7 @@ impl IcebergTableProvider { table, snapshot_id: None, schema, - table_fn: None, + catalog: None, } } /// Asynchronously tries to construct a new [`IcebergTableProvider`] @@ -83,7 +83,7 @@ impl IcebergTableProvider { Ok(IcebergTableProvider { table, snapshot_id: None, - table_fn: None, + catalog: Some(client), schema, }) } @@ -95,7 +95,7 @@ impl IcebergTableProvider { Ok(IcebergTableProvider { table, snapshot_id: None, - table_fn: None, + catalog: None, schema, }) } @@ -120,22 +120,7 @@ impl IcebergTableProvider { Ok(IcebergTableProvider { table, snapshot_id: Some(snapshot_id), - table_fn: None, - schema, - }) - } - - /// Asynchronously tries to construct a new [`IcebergTableProvider`] - /// using a specific snapshot of the given table. Can be used to create a table provider from an existing table regardless of the catalog implementation. - pub async fn try_new_from_table_fn( - table_fn: Arc BoxFuture<'static, Result
> + Send + Sync>, - ) -> Result { - let table = table_fn().await?; - let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); - Ok(IcebergTableProvider { - table, - snapshot_id: None, - table_fn: Some(table_fn), + catalog: None, schema, }) } @@ -162,10 +147,13 @@ impl TableProvider for IcebergTableProvider { filters: &[Expr], _limit: Option, ) -> DFResult> { - let table = if let Some(table_fn) = &self.table_fn { - table_fn().await.map_err(|e| { - DataFusionError::Execution(format!("Error getting Iceberg table metadata: {e}")) - })? + let table = if let Some(catalog) = &self.catalog { + catalog + .load_table(&self.table.identifier()) + .await + .map_err(|e| { + DataFusionError::Execution(format!("Error getting Iceberg table metadata: {e}")) + })? } else { self.table.clone() }; From 01a64452d855e37ba9ab18f061282f06ab0b6a58 Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Mon, 5 May 2025 22:32:07 +0900 Subject: [PATCH 5/7] make public --- crates/integrations/datafusion/src/table/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 08a775bb85..e6fa45e95d 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -70,7 +70,7 @@ impl IcebergTableProvider { /// Asynchronously tries to construct a new [`IcebergTableProvider`] /// using the given client and table name to fetch an actual [`Table`] /// in the provided namespace. - pub(crate) async fn try_new( + pub async fn try_new( client: Arc, namespace: NamespaceIdent, name: impl Into, From 01eee48cbcde1251f0b2b5a3ee5555979fa2b4cc Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Mon, 5 May 2025 22:34:23 +0900 Subject: [PATCH 6/7] Just take table ident --- crates/integrations/datafusion/src/table/mod.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index e6fa45e95d..b922794acd 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -70,13 +70,8 @@ impl IcebergTableProvider { /// Asynchronously tries to construct a new [`IcebergTableProvider`] /// using the given client and table name to fetch an actual [`Table`] /// in the provided namespace. - pub async fn try_new( - client: Arc, - namespace: NamespaceIdent, - name: impl Into, - ) -> Result { - let ident = TableIdent::new(namespace, name.into()); - let table = client.load_table(&ident).await?; + pub async fn try_new(client: Arc, table_name: TableIdent) -> Result { + let table = client.load_table(&table_name).await?; let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); From 2d01d56ffa606022caf4e3b6ce0f5169984ac6ae Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Mon, 5 May 2025 22:40:14 +0900 Subject: [PATCH 7/7] lint --- crates/integrations/datafusion/src/schema.rs | 7 +++++-- crates/integrations/datafusion/src/table/mod.rs | 6 +++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index 3be6da426e..4528017976 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -24,7 +24,7 @@ use datafusion::catalog::SchemaProvider; use datafusion::datasource::TableProvider; use datafusion::error::Result as DFResult; use futures::future::try_join_all; -use iceberg::{Catalog, NamespaceIdent, Result}; +use iceberg::{Catalog, NamespaceIdent, Result, TableIdent}; use crate::table::IcebergTableProvider; @@ -64,7 +64,10 @@ impl IcebergSchemaProvider { let providers = try_join_all( table_names .iter() - .map(|name| IcebergTableProvider::try_new(client.clone(), namespace.clone(), name)) + .map(|name| { + let table_ident = TableIdent::new(namespace.clone(), name.clone()); + IcebergTableProvider::try_new(client.clone(), table_ident) + }) .collect::>(), ) .await?; diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index b922794acd..dcb3d5fc65 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -27,10 +27,9 @@ use datafusion::datasource::{TableProvider, TableType}; use datafusion::error::{DataFusionError, Result as DFResult}; use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; use datafusion::physical_plan::ExecutionPlan; -use futures::future::BoxFuture; use iceberg::arrow::schema_to_arrow_schema; use iceberg::table::Table; -use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent}; +use iceberg::{Catalog, Error, ErrorKind, Result, TableIdent}; use crate::physical_plan::scan::IcebergTableScan; @@ -142,9 +141,10 @@ impl TableProvider for IcebergTableProvider { filters: &[Expr], _limit: Option, ) -> DFResult> { + // Get the latest table metadata from the catalog if it exists let table = if let Some(catalog) = &self.catalog { catalog - .load_table(&self.table.identifier()) + .load_table(self.table.identifier()) .await .map_err(|e| { DataFusionError::Execution(format!("Error getting Iceberg table metadata: {e}"))