Skip to content

Commit 16529b5

Browse files
Support retrieving the latest Iceberg table on table scan (#11)
* Allow resolving the current snapshot ID to use on a scan from a callback function * Use table_fn * Fix * Just pass a reference to the catalog * make public * Just take table ident * lint
1 parent 0b2d278 commit 16529b5

File tree

2 files changed

+38
-13
lines changed

2 files changed

+38
-13
lines changed

crates/integrations/datafusion/src/schema.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use datafusion::catalog::SchemaProvider;
2424
use datafusion::datasource::TableProvider;
2525
use datafusion::error::Result as DFResult;
2626
use futures::future::try_join_all;
27-
use iceberg::{Catalog, NamespaceIdent, Result};
27+
use iceberg::{Catalog, NamespaceIdent, Result, TableIdent};
2828

2929
use crate::table::IcebergTableProvider;
3030

@@ -64,7 +64,10 @@ impl IcebergSchemaProvider {
6464
let providers = try_join_all(
6565
table_names
6666
.iter()
67-
.map(|name| IcebergTableProvider::try_new(client.clone(), namespace.clone(), name))
67+
.map(|name| {
68+
let table_ident = TableIdent::new(namespace.clone(), name.clone());
69+
IcebergTableProvider::try_new(client.clone(), table_ident)
70+
})
6871
.collect::<Vec<_>>(),
6972
)
7073
.await?;

crates/integrations/datafusion/src/table/mod.rs

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,25 +24,37 @@ use async_trait::async_trait;
2424
use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
2525
use datafusion::catalog::Session;
2626
use datafusion::datasource::{TableProvider, TableType};
27-
use datafusion::error::Result as DFResult;
27+
use datafusion::error::{DataFusionError, Result as DFResult};
2828
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
2929
use datafusion::physical_plan::ExecutionPlan;
3030
use iceberg::arrow::schema_to_arrow_schema;
3131
use iceberg::table::Table;
32-
use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};
32+
use iceberg::{Catalog, Error, ErrorKind, Result, TableIdent};
3333

3434
use crate::physical_plan::scan::IcebergTableScan;
3535

3636
/// Represents a [`TableProvider`] for the Iceberg [`Catalog`],
3737
/// managing access to a [`Table`].
38-
#[derive(Debug, Clone)]
38+
#[derive(Clone)]
3939
pub struct IcebergTableProvider {
4040
/// A table in the catalog.
4141
table: Table,
4242
/// Table snapshot id that will be queried via this provider.
4343
snapshot_id: Option<i64>,
4444
/// A reference-counted arrow `Schema`.
4545
schema: ArrowSchemaRef,
46+
/// A reference to the catalog that this table provider belongs to.
47+
catalog: Option<Arc<dyn Catalog>>,
48+
}
49+
50+
impl std::fmt::Debug for IcebergTableProvider {
51+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52+
f.debug_struct("IcebergTableProvider")
53+
.field("table", &self.table)
54+
.field("snapshot_id", &self.snapshot_id)
55+
.field("schema", &self.schema)
56+
.finish_non_exhaustive()
57+
}
4658
}
4759

4860
impl IcebergTableProvider {
@@ -51,24 +63,21 @@ impl IcebergTableProvider {
5163
table,
5264
snapshot_id: None,
5365
schema,
66+
catalog: None,
5467
}
5568
}
5669
/// Asynchronously tries to construct a new [`IcebergTableProvider`]
5770
/// using the given client and table name to fetch an actual [`Table`]
5871
/// in the provided namespace.
59-
pub(crate) async fn try_new(
60-
client: Arc<dyn Catalog>,
61-
namespace: NamespaceIdent,
62-
name: impl Into<String>,
63-
) -> Result<Self> {
64-
let ident = TableIdent::new(namespace, name.into());
65-
let table = client.load_table(&ident).await?;
72+
pub async fn try_new(client: Arc<dyn Catalog>, table_name: TableIdent) -> Result<Self> {
73+
let table = client.load_table(&table_name).await?;
6674

6775
let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
6876

6977
Ok(IcebergTableProvider {
7078
table,
7179
snapshot_id: None,
80+
catalog: Some(client),
7281
schema,
7382
})
7483
}
@@ -80,6 +89,7 @@ impl IcebergTableProvider {
8089
Ok(IcebergTableProvider {
8190
table,
8291
snapshot_id: None,
92+
catalog: None,
8393
schema,
8494
})
8595
}
@@ -104,6 +114,7 @@ impl IcebergTableProvider {
104114
Ok(IcebergTableProvider {
105115
table,
106116
snapshot_id: Some(snapshot_id),
117+
catalog: None,
107118
schema,
108119
})
109120
}
@@ -130,8 +141,19 @@ impl TableProvider for IcebergTableProvider {
130141
filters: &[Expr],
131142
_limit: Option<usize>,
132143
) -> DFResult<Arc<dyn ExecutionPlan>> {
144+
// Get the latest table metadata from the catalog if it exists
145+
let table = if let Some(catalog) = &self.catalog {
146+
catalog
147+
.load_table(self.table.identifier())
148+
.await
149+
.map_err(|e| {
150+
DataFusionError::Execution(format!("Error getting Iceberg table metadata: {e}"))
151+
})?
152+
} else {
153+
self.table.clone()
154+
};
133155
Ok(Arc::new(IcebergTableScan::new(
134-
self.table.clone(),
156+
table,
135157
self.snapshot_id,
136158
self.schema.clone(),
137159
projection,

0 commit comments

Comments
 (0)