Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions datafusion_iceberg/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,14 +602,16 @@ impl DataSink for IcebergDataSink {
)
.await?;

let count = metadata_files.iter().map(|x|x.record_count()).fold(0, |acc, x| acc+ x);

table
.new_transaction(self.0.branch.as_deref())
.append(metadata_files)
.commit()
.await
.map_err(DataFusionIcebergError::from)?;

Ok(0)
Ok(count as u64)
}
fn metrics(&self) -> Option<MetricsSet> {
None
Expand Down Expand Up @@ -953,7 +955,7 @@ mod tests {

ctx.register_table("orders", table.clone()).unwrap();

ctx.sql(
let res = ctx.sql(
"INSERT INTO orders (id, customer_id, product_id, date, amount) VALUES
(1, 1, 1, '2020-01-01', 1),
(2, 2, 1, '2020-01-01', 1),
Expand All @@ -968,6 +970,14 @@ mod tests {
.await
.expect("Failed to insert values into table");

let count = res[0]
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.values()[0];
assert_eq!(count, 6);

let batches = ctx
.sql("select product_id, sum(amount) from orders where customer_id = 1 group by product_id;")
.await
Expand Down