diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index 3be6da426e..4528017976 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -24,7 +24,7 @@ use datafusion::catalog::SchemaProvider; use datafusion::datasource::TableProvider; use datafusion::error::Result as DFResult; use futures::future::try_join_all; -use iceberg::{Catalog, NamespaceIdent, Result}; +use iceberg::{Catalog, NamespaceIdent, Result, TableIdent}; use crate::table::IcebergTableProvider; @@ -64,7 +64,10 @@ impl IcebergSchemaProvider { let providers = try_join_all( table_names .iter() - .map(|name| IcebergTableProvider::try_new(client.clone(), namespace.clone(), name)) + .map(|name| { + let table_ident = TableIdent::new(namespace.clone(), name.clone()); + IcebergTableProvider::try_new(client.clone(), table_ident) + }) .collect::>(), ) .await?; diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 00c9e13229..dcb3d5fc65 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -24,18 +24,18 @@ use async_trait::async_trait; use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; use datafusion::catalog::Session; use datafusion::datasource::{TableProvider, TableType}; -use datafusion::error::Result as DFResult; +use datafusion::error::{DataFusionError, Result as DFResult}; use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; use datafusion::physical_plan::ExecutionPlan; use iceberg::arrow::schema_to_arrow_schema; use iceberg::table::Table; -use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent}; +use iceberg::{Catalog, Error, ErrorKind, Result, TableIdent}; use crate::physical_plan::scan::IcebergTableScan; /// Represents a [`TableProvider`] for the Iceberg [`Catalog`], /// managing access to a [`Table`]. -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct IcebergTableProvider { /// A table in the catalog. table: Table, @@ -43,6 +43,18 @@ pub struct IcebergTableProvider { snapshot_id: Option, /// A reference-counted arrow `Schema`. schema: ArrowSchemaRef, + /// A reference to the catalog that this table provider belongs to. + catalog: Option>, +} + +impl std::fmt::Debug for IcebergTableProvider { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("IcebergTableProvider") + .field("table", &self.table) + .field("snapshot_id", &self.snapshot_id) + .field("schema", &self.schema) + .finish_non_exhaustive() + } } impl IcebergTableProvider { @@ -51,24 +63,21 @@ impl IcebergTableProvider { table, snapshot_id: None, schema, + catalog: None, } } /// Asynchronously tries to construct a new [`IcebergTableProvider`] /// using the given client and table name to fetch an actual [`Table`] /// in the provided namespace. - pub(crate) async fn try_new( - client: Arc, - namespace: NamespaceIdent, - name: impl Into, - ) -> Result { - let ident = TableIdent::new(namespace, name.into()); - let table = client.load_table(&ident).await?; + pub async fn try_new(client: Arc, table_name: TableIdent) -> Result { + let table = client.load_table(&table_name).await?; let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); Ok(IcebergTableProvider { table, snapshot_id: None, + catalog: Some(client), schema, }) } @@ -80,6 +89,7 @@ impl IcebergTableProvider { Ok(IcebergTableProvider { table, snapshot_id: None, + catalog: None, schema, }) } @@ -104,6 +114,7 @@ impl IcebergTableProvider { Ok(IcebergTableProvider { table, snapshot_id: Some(snapshot_id), + catalog: None, schema, }) } @@ -130,8 +141,19 @@ impl TableProvider for IcebergTableProvider { filters: &[Expr], _limit: Option, ) -> DFResult> { + // Get the latest table metadata from the catalog if it exists + let table = if let Some(catalog) = &self.catalog { + catalog + .load_table(self.table.identifier()) + .await + .map_err(|e| { + DataFusionError::Execution(format!("Error getting Iceberg table metadata: {e}")) + })? + } else { + self.table.clone() + }; Ok(Arc::new(IcebergTableScan::new( - self.table.clone(), + table, self.snapshot_id, self.schema.clone(), projection,