From c906fd3a2ef5aa49cd173c1afef8d81e421dd360 Mon Sep 17 00:00:00 2001 From: Yaroslav Litvinov Date: Tue, 18 Feb 2025 02:12:53 +0200 Subject: [PATCH 1/4] insert count --- datafusion/physical-plan/src/insert.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index c3ad0977ed2b..542950606b15 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -224,8 +224,16 @@ impl ExecutionPlan for DataSinkExec { if partition != 0 { return internal_err!("DataSinkExec can only be called on partition 0!"); } + let input = Arc::clone(&self.input); + // get statistics (that is cheap according to docs) for the RecordBatch as can't use + // async stream returned by execute_input_stream as it can be consumed only once + let num_rows = match input.statistics() { + Ok(stats) => *stats.num_rows.get_value().unwrap_or(&0), + Err(err) => return Err(err), + }; + let data = execute_input_stream( - Arc::clone(&self.input), + input, Arc::clone(self.sink.schema()), 0, Arc::clone(&context), @@ -235,7 +243,9 @@ impl ExecutionPlan for DataSinkExec { let sink = Arc::clone(&self.sink); let stream = futures::stream::once(async move { - sink.write_all(data, &context).await.map(make_count_batch) + sink.write_all(data, &context) + .await + .map(|_| make_count_batch(num_rows as u64)) }) .boxed(); @@ -262,7 +272,7 @@ impl ExecutionPlan for DataSinkExec { /// ``` fn make_count_batch(count: u64) -> RecordBatch { let array = Arc::new(Int64Array::from(vec![count as i64])) as ArrayRef; - + println!("array: {array:?}"); RecordBatch::try_from_iter_with_nullable(vec![("count", array, false)]).unwrap() } From c3cf884338095815f86d615eb6252cb76740b244 Mon Sep 17 00:00:00 2001 From: Yaroslav Litvinov Date: Tue, 18 Feb 2025 04:01:47 +0200 Subject: [PATCH 2/4] rm print --- datafusion/physical-plan/src/insert.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index 542950606b15..de6ab01d05d8 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -272,7 +272,6 @@ impl ExecutionPlan for DataSinkExec { /// ``` fn make_count_batch(count: u64) -> RecordBatch { let array = Arc::new(Int64Array::from(vec![count as i64])) as ArrayRef; - println!("array: {array:?}"); RecordBatch::try_from_iter_with_nullable(vec![("count", array, false)]).unwrap() } From cc9d8f7b6a2a13d285ab9f30c4edbbe8686d6ef9 Mon Sep 17 00:00:00 2001 From: Yaroslav Litvinov Date: Tue, 25 Feb 2025 17:26:00 +0200 Subject: [PATCH 3/4] Revert "return Int64 instead of UInt64 when returning count (#17)" This reverts commit 314a72634dc4f15242345fd17dc32df6c3ae04e8. --- datafusion/core/tests/user_defined/insert_operation.rs | 2 +- datafusion/expr/src/logical_plan/dml.rs | 3 +-- datafusion/physical-plan/src/insert.rs | 7 ++++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/datafusion/core/tests/user_defined/insert_operation.rs b/datafusion/core/tests/user_defined/insert_operation.rs index 919c3fee3f5a..aa531632c60b 100644 --- a/datafusion/core/tests/user_defined/insert_operation.rs +++ b/datafusion/core/tests/user_defined/insert_operation.rs @@ -184,7 +184,7 @@ impl ExecutionPlan for TestInsertExec { fn make_count_schema() -> SchemaRef { Arc::new(Schema::new(vec![Field::new( "count", - DataType::Int64, // should return signed int for snowflake + DataType::UInt64, false, )])) } diff --git a/datafusion/expr/src/logical_plan/dml.rs b/datafusion/expr/src/logical_plan/dml.rs index 1c9838214e08..669bc8e8a7d3 100644 --- a/datafusion/expr/src/logical_plan/dml.rs +++ b/datafusion/expr/src/logical_plan/dml.rs @@ -203,8 +203,7 @@ impl Display for InsertOp { fn make_count_schema() -> DFSchemaRef { Arc::new( - // should return signed int for snowflake - Schema::new(vec![Field::new("count", DataType::Int64, false)]) + Schema::new(vec![Field::new("count", DataType::UInt64, false)]) .try_into() .unwrap(), ) diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index de6ab01d05d8..3d7393a48b0b 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -32,7 +32,7 @@ use crate::ExecutionPlanProperties; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; -use arrow_array::{ArrayRef, Int64Array}; +use arrow_array::{ArrayRef, UInt64Array}; use arrow_schema::{DataType, Field, Schema}; use datafusion_common::{internal_err, Result}; use datafusion_execution::TaskContext; @@ -271,7 +271,8 @@ impl ExecutionPlan for DataSinkExec { /// +-------+, /// ``` fn make_count_batch(count: u64) -> RecordBatch { - let array = Arc::new(Int64Array::from(vec![count as i64])) as ArrayRef; + let array = Arc::new(UInt64Array::from(vec![count])) as ArrayRef; + RecordBatch::try_from_iter_with_nullable(vec![("count", array, false)]).unwrap() } @@ -279,7 +280,7 @@ fn make_count_schema() -> SchemaRef { // Define a schema. Arc::new(Schema::new(vec![Field::new( "count", - DataType::Int64, // should return signed int for snowflake + DataType::UInt64, false, )])) } From bc556f00ad6b5f43753f4a80156a893129560f81 Mon Sep 17 00:00:00 2001 From: Yaroslav Litvinov Date: Wed, 26 Feb 2025 01:04:20 +0200 Subject: [PATCH 4/4] revert changes as of bad approach --- datafusion/physical-plan/src/insert.rs | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index 3d7393a48b0b..bfb1e9d53df5 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -224,16 +224,8 @@ impl ExecutionPlan for DataSinkExec { if partition != 0 { return internal_err!("DataSinkExec can only be called on partition 0!"); } - let input = Arc::clone(&self.input); - // get statistics (that is cheap according to docs) for the RecordBatch as can't use - // async stream returned by execute_input_stream as it can be consumed only once - let num_rows = match input.statistics() { - Ok(stats) => *stats.num_rows.get_value().unwrap_or(&0), - Err(err) => return Err(err), - }; - let data = execute_input_stream( - input, + Arc::clone(&self.input), Arc::clone(self.sink.schema()), 0, Arc::clone(&context), @@ -243,9 +235,7 @@ impl ExecutionPlan for DataSinkExec { let sink = Arc::clone(&self.sink); let stream = futures::stream::once(async move { - sink.write_all(data, &context) - .await - .map(|_| make_count_batch(num_rows as u64)) + sink.write_all(data, &context).await.map(make_count_batch) }) .boxed();