From a12824c6f74c6ed41dcb99d38baf5aae889e9613 Mon Sep 17 00:00:00 2001 From: Yaroslav Litvinov Date: Wed, 26 Feb 2025 03:21:58 +0200 Subject: [PATCH] affected rows count for insert --- datafusion_iceberg/src/table.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/datafusion_iceberg/src/table.rs b/datafusion_iceberg/src/table.rs index e3990b5f..46ba4b17 100644 --- a/datafusion_iceberg/src/table.rs +++ b/datafusion_iceberg/src/table.rs @@ -602,6 +602,8 @@ impl DataSink for IcebergDataSink { ) .await?; + let count = metadata_files.iter().map(|x|x.record_count()).fold(0, |acc, x| acc+ x); + table .new_transaction(self.0.branch.as_deref()) .append(metadata_files) @@ -609,7 +611,7 @@ impl DataSink for IcebergDataSink { .await .map_err(DataFusionIcebergError::from)?; - Ok(0) + Ok(count as u64) } fn metrics(&self) -> Option { None @@ -953,7 +955,7 @@ mod tests { ctx.register_table("orders", table.clone()).unwrap(); - ctx.sql( + let res = ctx.sql( "INSERT INTO orders (id, customer_id, product_id, date, amount) VALUES (1, 1, 1, '2020-01-01', 1), (2, 2, 1, '2020-01-01', 1), @@ -968,6 +970,14 @@ mod tests { .await .expect("Failed to insert values into table"); + let count = res[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values()[0]; + assert_eq!(count, 6); + let batches = ctx .sql("select product_id, sum(amount) from orders where customer_id = 1 group by product_id;") .await