diff --git a/datafusion_iceberg/src/table.rs b/datafusion_iceberg/src/table.rs index e3990b5f..46ba4b17 100644 --- a/datafusion_iceberg/src/table.rs +++ b/datafusion_iceberg/src/table.rs @@ -602,6 +602,8 @@ impl DataSink for IcebergDataSink { ) .await?; + let count = metadata_files.iter().map(|x|x.record_count()).fold(0, |acc, x| acc+ x); + table .new_transaction(self.0.branch.as_deref()) .append(metadata_files) @@ -609,7 +611,7 @@ impl DataSink for IcebergDataSink { .await .map_err(DataFusionIcebergError::from)?; - Ok(0) + Ok(count as u64) } fn metrics(&self) -> Option { None @@ -953,7 +955,7 @@ mod tests { ctx.register_table("orders", table.clone()).unwrap(); - ctx.sql( + let res = ctx.sql( "INSERT INTO orders (id, customer_id, product_id, date, amount) VALUES (1, 1, 1, '2020-01-01', 1), (2, 2, 1, '2020-01-01', 1), @@ -968,6 +970,14 @@ mod tests { .await .expect("Failed to insert values into table"); + let count = res[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values()[0]; + assert_eq!(count, 6); + let batches = ctx .sql("select product_id, sum(amount) from orders where customer_id = 1 group by product_id;") .await