From 029c84e2888b657fa9f994cf168a6ff8b66dd727 Mon Sep 17 00:00:00 2001 From: Jay Zhan Date: Fri, 11 Apr 2025 21:44:38 +0800 Subject: [PATCH 1/2] refactor --- .../src/expressions/dynamic_filters.rs | 228 ++++++++---------- 1 file changed, 102 insertions(+), 126 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index c0a3285f0e78..859d0e1f053e 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -23,13 +23,15 @@ use std::{ }; use crate::PhysicalExpr; -use arrow::datatypes::{DataType, Schema}; +use arrow::datatypes::{DataType, Schema, SchemaRef}; use datafusion_common::{ tree_node::{Transformed, TransformedResult, TreeNode}, Result, }; use datafusion_expr::ColumnarValue; -use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash}; +use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash, PhysicalExprRef}; + +use super::Column; /// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference to it. #[derive(Debug)] @@ -38,12 +40,13 @@ pub struct DynamicFilterPhysicalExpr { /// This is necessary because the dynamic filter may be initialized with a placeholder (e.g. `lit(true)`) /// and later remapped to the actual expressions that are being filtered. /// But we need to know the children (e.g. columns referenced in the expression) ahead of time to evaluate the expression correctly. - children: Vec>, - /// If any of the children were remapped / modified (e.g. to adjust for projections) we need to keep track of the new children - /// so that when we update `current()` in subsequent iterations we can re-apply the replacements. - remapped_children: Option>>, + // columns: Vec>, + // /// If any of the children were remapped / modified (e.g. to adjust for projections) we need to keep track of the new children + // /// so that when we update `current()` in subsequent iterations we can re-apply the replacements. + remapped_schema: Option, /// The source of dynamic filters. - inner: Arc>>, + inner: Arc>, + /// For testing purposes track the data type and nullability to make sure they don't change. /// If they do, there's a bug in the implementation. /// But this can have overhead in production, so it's only included in our tests. @@ -53,20 +56,13 @@ pub struct DynamicFilterPhysicalExpr { impl Hash for DynamicFilterPhysicalExpr { fn hash(&self, state: &mut H) { - let inner = self.current().expect("Failed to get current expression"); - inner.dyn_hash(state); - self.children.dyn_hash(state); - self.remapped_children.dyn_hash(state); + todo!("") } } impl PartialEq for DynamicFilterPhysicalExpr { fn eq(&self, other: &Self) -> bool { - let inner = self.current().expect("Failed to get current expression"); - let our_children = self.remapped_children.as_ref().unwrap_or(&self.children); - let other_children = other.remapped_children.as_ref().unwrap_or(&other.children); - let other = other.current().expect("Failed to get current expression"); - inner.dyn_eq(other.as_any()) && our_children == other_children + todo!("") } } @@ -105,50 +101,34 @@ impl DynamicFilterPhysicalExpr { /// [`collect_columns`]: crate::utils::collect_columns #[allow(dead_code)] // Only used in tests for now pub fn new( - children: Vec>, + // children: Vec>, inner: Arc, ) -> Self { Self { - children, - remapped_children: None, // Initially no remapped children + // columns: children, + // remapped_columns: None, // Initially no remapped children + remapped_schema: None, + // remapped_filter: None, inner: Arc::new(RwLock::new(inner)), data_type: Arc::new(RwLock::new(None)), nullable: Arc::new(RwLock::new(None)), } } - fn remap_children( - children: &[Arc], - remapped_children: Option<&Vec>>, - expr: Arc, - ) -> Result> { - if let Some(remapped_children) = remapped_children { - // Remap the children to the new children - // of the expression. - expr.transform_up(|child| { - // Check if this is any of our original children - if let Some(pos) = - children.iter().position(|c| c.as_ref() == child.as_ref()) - { - // If so, remap it to the current children - // of the expression. - let new_child = Arc::clone(&remapped_children[pos]); - Ok(Transformed::yes(new_child)) - } else { - // Otherwise, just return the expression - Ok(Transformed::no(child)) - } - }) - .data() - } else { - // If we don't have any remapped children, just return the expression - Ok(Arc::clone(&expr)) + // udpate schema + pub fn with_schema( + &self, + schema: SchemaRef, + ) -> Self { + Self { + remapped_schema: Some(schema), + inner: Arc::clone(&self.inner), + data_type: Arc::clone(&self.data_type), + nullable: Arc::clone(&self.nullable), } } - /// Get the current expression. - /// This will return the current expression with any children - /// remapped to match calls to [`PhysicalExpr::with_new_children`]. + // get the source filter pub fn current(&self) -> Result> { let inner = self .inner @@ -159,35 +139,13 @@ impl DynamicFilterPhysicalExpr { ) })? .clone(); - let inner = - Self::remap_children(&self.children, self.remapped_children.as_ref(), inner)?; Ok(inner) } - /// Update the current expression. - /// Any children of this expression must be a subset of the original children - /// passed to the constructor. - /// This should be called e.g.: - /// - When we've computed the probe side's hash table in a HashJoinExec - /// - After every batch is processed if we update the TopK heap in a SortExec using a TopK approach. - #[allow(dead_code)] // Only used in tests for now - pub fn update(&self, new_expr: Arc) -> Result<()> { - let mut current = self.inner.write().map_err(|_| { - datafusion_common::DataFusionError::Execution( - "Failed to acquire write lock for inner".to_string(), - ) - })?; - // Remap the children of the new expression to match the original children - // We still do this again in `current()` but doing it preventively here - // reduces the work needed in some cases if `current()` is called multiple times - // and the same externally facing `PhysicalExpr` is used for both `with_new_children` and `update()`.` - let new_expr = Self::remap_children( - &self.children, - self.remapped_children.as_ref(), - new_expr, - )?; - *current = new_expr; - Ok(()) + // update source filter + pub fn update(&self, filter: PhysicalExprRef) { + let mut w = self.inner.write().unwrap(); + *w = filter; } } @@ -197,24 +155,18 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { } fn children(&self) -> Vec<&Arc> { - self.remapped_children - .as_ref() - .unwrap_or(&self.children) - .iter() - .collect() + todo!("") } + // update source filter fn with_new_children( self: Arc, - children: Vec>, + mut children: Vec>, ) -> Result> { - Ok(Arc::new(Self { - children: self.children.clone(), - remapped_children: Some(children), - inner: Arc::clone(&self.inner), - data_type: Arc::clone(&self.data_type), - nullable: Arc::clone(&self.nullable), - })) + debug_assert_eq!(children.len(), 1); + let inner = children.swap_remove(0); + self.update(inner); + Ok(self) } fn data_type(&self, input_schema: &Schema) -> Result { @@ -287,10 +239,35 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { inner.fmt_sql(f) } - fn snapshot(&self) -> Result>> { - // Return the current expression as a snapshot. - Ok(Some(self.current()?)) + // snapshot with given schema based on the source filter. + // only evalute is expected to be called after this output. no schema or source filter are updated for the snapshot. + fn snapshot(&self) -> Result> { + if let Some(remapped_schema) = self.remapped_schema.as_ref() { + let pred = self.current()?; + let new_pred = pred.transform_up(|expr| { + if let Some(col) = expr.as_any().downcast_ref::() { + let index = match remapped_schema.index_of(col.name()) { + Ok(idx) => idx, + Err(_) => return Err(datafusion_common::DataFusionError::Plan( + format!("Column {} not found in schema", col.name()), + )), + }; + return Ok(Transformed::yes(Arc::new(Column::new( + col.name(), + index, + )))); + } else { + // If the expression is not a column, just return it + return Ok(Transformed::no(expr)); + } + }).data()?; + + Ok(Some(new_pred)) + } else { + Ok(Some(self.current()?)) + } } + } #[cfg(test)] @@ -304,6 +281,7 @@ mod test { datatypes::{DataType, Field, Schema}, }; use datafusion_common::ScalarValue; + use datafusion_physical_expr_common::physical_expr::PhysicalExprRef; use super::*; @@ -319,7 +297,7 @@ mod test { lit(42) as Arc, )); let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new( - vec![col("a", &table_schema).unwrap()], + // vec![col("a", &table_schema).unwrap()], expr as Arc, )); // Simulate two `ParquetSource` files with different filter schemas @@ -335,22 +313,12 @@ mod test { ])); // Each ParquetExec calls `with_new_children` on the DynamicFilterPhysicalExpr // and remaps the children to the file schema. - let dynamic_filter_1 = reassign_predicate_columns( - Arc::clone(&dynamic_filter) as Arc, - &filter_schema_1, - false, - ) - .unwrap(); - let snap = dynamic_filter_1.snapshot().unwrap().unwrap(); - insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) }, fail_on_overflow: false }"#); - let dynamic_filter_2 = reassign_predicate_columns( - Arc::clone(&dynamic_filter) as Arc, - &filter_schema_2, - false, - ) - .unwrap(); - let snap = dynamic_filter_2.snapshot().unwrap().unwrap(); - insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 1 }, op: Eq, right: Literal { value: Int32(42) }, fail_on_overflow: false }"#); + let dynamic_filter_1 = dynamic_filter.with_schema(Arc::clone(&filter_schema_1)); + let snap_1 = dynamic_filter_1.snapshot().unwrap().unwrap(); + insta::assert_snapshot!(format!("{snap_1:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) }, fail_on_overflow: false }"#); + let dynamic_filter_2 = dynamic_filter.with_schema(Arc::clone(&filter_schema_2)); + let snap_2 = dynamic_filter_2.snapshot().unwrap().unwrap(); + insta::assert_snapshot!(format!("{snap_2:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 1 }, op: Eq, right: Literal { value: Int32(42) }, fail_on_overflow: false }"#); // Both filters allow evaluating the same expression let batch_1 = RecordBatch::try_new( Arc::clone(&filter_schema_1), @@ -373,8 +341,8 @@ mod test { ) .unwrap(); // Evaluate the expression on both batches - let result_1 = dynamic_filter_1.evaluate(&batch_1).unwrap(); - let result_2 = dynamic_filter_2.evaluate(&batch_2).unwrap(); + let result_1 = snap_1.evaluate(&batch_1).unwrap(); + let result_2 = snap_2.evaluate(&batch_2).unwrap(); // Check that the results are the same let ColumnarValue::Array(arr_1) = result_1 else { panic!("Expected ColumnarValue::Array"); @@ -393,13 +361,18 @@ mod test { col("a", &table_schema).unwrap(), datafusion_expr::Operator::Gt, lit(43) as Arc, - )); - dynamic_filter - .update(Arc::clone(&new_expr) as Arc) - .expect("Failed to update expression"); + )) as PhysicalExprRef; + + dynamic_filter.with_new_children(vec![new_expr]) + .expect("Failed to update children"); + + let snap_1 = dynamic_filter_1.snapshot().unwrap().unwrap(); + let snap_2 = dynamic_filter_2.snapshot().unwrap().unwrap(); + // Now we should be able to evaluate the new expression on both batches - let result_1 = dynamic_filter_1.evaluate(&batch_1).unwrap(); - let result_2 = dynamic_filter_2.evaluate(&batch_2).unwrap(); + let result_1 = snap_1.evaluate(&batch_1).unwrap(); + let result_2 = snap_2.evaluate(&batch_2).unwrap(); + // Check that the results are the same let ColumnarValue::Array(arr_1) = result_1 else { panic!("Expected ColumnarValue::Array"); @@ -417,24 +390,26 @@ mod test { #[test] fn test_snapshot() { let expr = lit(42) as Arc; - let dynamic_filter = DynamicFilterPhysicalExpr::new(vec![], Arc::clone(&expr)); + let dynamic_filter = DynamicFilterPhysicalExpr::new(Arc::clone(&expr)); // Take a snapshot of the current expression - let snapshot = dynamic_filter.snapshot().unwrap(); - assert_eq!(snapshot, Some(expr)); + let snapshot = dynamic_filter.snapshot().unwrap().unwrap(); + assert_eq!(&snapshot, &expr); // Update the current expression let new_expr = lit(100) as Arc; - dynamic_filter.update(Arc::clone(&new_expr)).unwrap(); + // dynamic_filter.with_new_children(vec![new_expr.clone()]) + // .expect("Failed to update expression"); + dynamic_filter.update(Arc::clone(&new_expr)); // Take another snapshot - let snapshot = dynamic_filter.snapshot().unwrap(); - assert_eq!(snapshot, Some(new_expr)); + let snapshot = dynamic_filter.snapshot().unwrap().unwrap(); + assert_eq!(&snapshot, &new_expr); } #[test] fn test_dynamic_filter_physical_expr_misbehaves_data_type_nullable() { let dynamic_filter = - DynamicFilterPhysicalExpr::new(vec![], lit(42) as Arc); + DynamicFilterPhysicalExpr::new(lit(42)); // First call to data_type and nullable should set the initial values. let initial_data_type = dynamic_filter.data_type(&Schema::empty()).unwrap(); @@ -454,8 +429,7 @@ mod test { // Now change the current expression to something else. dynamic_filter - .update(lit(ScalarValue::Utf8(None)) as Arc) - .expect("Failed to update expression"); + .update(lit(ScalarValue::Utf8(None))); // Check that we error if we call data_type, nullable or evaluate after changing the expression. assert!( dynamic_filter.data_type(&Schema::empty()).is_err(), @@ -466,9 +440,11 @@ mod test { "Expected err when nullable is called after changing the expression." ); let batch = RecordBatch::new_empty(Arc::new(Schema::empty())); + + let snap = dynamic_filter.snapshot().unwrap().unwrap(); + // this is changed to ok, but makes sense assert!( - dynamic_filter.evaluate(&batch).is_err(), - "Expected err when evaluate is called after changing the expression." + snap.evaluate(&batch).is_ok(), ); } } From 438292ac67043ec089d6fd5ce8c15ad115dffd27 Mon Sep 17 00:00:00 2001 From: Jay Zhan Date: Fri, 11 Apr 2025 22:19:57 +0800 Subject: [PATCH 2/2] DynamicFilterPhysicalExpr is PhysicalExprRef --- .../physical-expr-common/src/physical_expr.rs | 10 +- .../src/expressions/dynamic_filters.rs | 162 +++++++++--------- 2 files changed, 90 insertions(+), 82 deletions(-) diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 3bc41d2652d9..38c87b9bc6e4 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -25,7 +25,7 @@ use crate::utils::scatter; use arrow::array::BooleanArray; use arrow::compute::filter_record_batch; -use arrow::datatypes::{DataType, Schema}; +use arrow::datatypes::{DataType, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{internal_err, not_impl_err, Result, ScalarValue}; @@ -327,7 +327,10 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + DynEq + DynHash { /// /// Note for implementers: this method should *not* handle recursion. /// Recursion is handled in [`snapshot_physical_expr`]. - fn snapshot(&self) -> Result>> { + fn snapshot( + &self, + remapped_schema: Option, + ) -> Result>> { // By default, we return None to indicate that this PhysicalExpr does not // have any dynamic references or state. // This is a safe default behavior. @@ -513,9 +516,10 @@ pub fn fmt_sql(expr: &dyn PhysicalExpr) -> impl Display + '_ { /// any dynamic references or state, it returns `None`. pub fn snapshot_physical_expr( expr: Arc, + schema: Option, ) -> Result> { expr.transform_up(|e| { - if let Some(snapshot) = e.snapshot()? { + if let Some(snapshot) = e.snapshot(schema.clone())? { Ok(Transformed::yes(snapshot)) } else { Ok(Transformed::no(Arc::clone(&e))) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 859d0e1f053e..10d63bd150d7 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -36,16 +36,8 @@ use super::Column; /// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference to it. #[derive(Debug)] pub struct DynamicFilterPhysicalExpr { - /// The original children of this PhysicalExpr, if any. - /// This is necessary because the dynamic filter may be initialized with a placeholder (e.g. `lit(true)`) - /// and later remapped to the actual expressions that are being filtered. - /// But we need to know the children (e.g. columns referenced in the expression) ahead of time to evaluate the expression correctly. - // columns: Vec>, - // /// If any of the children were remapped / modified (e.g. to adjust for projections) we need to keep track of the new children - // /// so that when we update `current()` in subsequent iterations we can re-apply the replacements. - remapped_schema: Option, /// The source of dynamic filters. - inner: Arc>, + inner: PhysicalExprRef, /// For testing purposes track the data type and nullability to make sure they don't change. /// If they do, there's a bug in the implementation. @@ -105,47 +97,44 @@ impl DynamicFilterPhysicalExpr { inner: Arc, ) -> Self { Self { - // columns: children, - // remapped_columns: None, // Initially no remapped children - remapped_schema: None, - // remapped_filter: None, - inner: Arc::new(RwLock::new(inner)), + inner, data_type: Arc::new(RwLock::new(None)), nullable: Arc::new(RwLock::new(None)), } } // udpate schema - pub fn with_schema( - &self, - schema: SchemaRef, - ) -> Self { - Self { - remapped_schema: Some(schema), - inner: Arc::clone(&self.inner), - data_type: Arc::clone(&self.data_type), - nullable: Arc::clone(&self.nullable), - } - } + // pub fn with_schema(&self, schema: SchemaRef) -> Self { + // Self { + // remapped_schema: Some(schema), + // inner: Arc::clone(&self.inner), + // data_type: Arc::clone(&self.data_type), + // nullable: Arc::clone(&self.nullable), + // } + // } // get the source filter pub fn current(&self) -> Result> { - let inner = self - .inner - .read() - .map_err(|_| { - datafusion_common::DataFusionError::Execution( - "Failed to acquire read lock for inner".to_string(), - ) - })? - .clone(); + let inner = Arc::clone(&self.inner); + + // let inner = self + // .inner + // .read() + // .map_err(|_| { + // datafusion_common::DataFusionError::Execution( + // "Failed to acquire read lock for inner".to_string(), + // ) + // })? + // .clone(); Ok(inner) } // update source filter - pub fn update(&self, filter: PhysicalExprRef) { - let mut w = self.inner.write().unwrap(); - *w = filter; + // create a new one + pub fn update(&mut self, filter: PhysicalExprRef) { + self.inner = filter; + // let mut w = self.inner.write().unwrap(); + // *w = filter; } } @@ -165,8 +154,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { ) -> Result> { debug_assert_eq!(children.len(), 1); let inner = children.swap_remove(0); - self.update(inner); - Ok(self) + Ok(Arc::new(Self::new(inner))) } fn data_type(&self, input_schema: &Schema) -> Result { @@ -241,33 +229,39 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { // snapshot with given schema based on the source filter. // only evalute is expected to be called after this output. no schema or source filter are updated for the snapshot. - fn snapshot(&self) -> Result> { - if let Some(remapped_schema) = self.remapped_schema.as_ref() { + fn snapshot( + &self, + remapped_schema: Option, + ) -> Result> { + if let Some(remapped_schema) = remapped_schema { let pred = self.current()?; - let new_pred = pred.transform_up(|expr| { - if let Some(col) = expr.as_any().downcast_ref::() { - let index = match remapped_schema.index_of(col.name()) { - Ok(idx) => idx, - Err(_) => return Err(datafusion_common::DataFusionError::Plan( - format!("Column {} not found in schema", col.name()), - )), - }; - return Ok(Transformed::yes(Arc::new(Column::new( - col.name(), - index, - )))); - } else { - // If the expression is not a column, just return it - return Ok(Transformed::no(expr)); - } - }).data()?; + let new_pred = pred + .transform_up(|expr| { + if let Some(col) = expr.as_any().downcast_ref::() { + let index = match remapped_schema.index_of(col.name()) { + Ok(idx) => idx, + Err(_) => { + return Err(datafusion_common::DataFusionError::Plan( + format!("Column {} not found in schema", col.name()), + )) + } + }; + return Ok(Transformed::yes(Arc::new(Column::new( + col.name(), + index, + )))); + } else { + // If the expression is not a column, just return it + return Ok(Transformed::no(expr)); + } + }) + .data()?; Ok(Some(new_pred)) } else { Ok(Some(self.current()?)) } } - } #[cfg(test)] @@ -313,11 +307,15 @@ mod test { ])); // Each ParquetExec calls `with_new_children` on the DynamicFilterPhysicalExpr // and remaps the children to the file schema. - let dynamic_filter_1 = dynamic_filter.with_schema(Arc::clone(&filter_schema_1)); - let snap_1 = dynamic_filter_1.snapshot().unwrap().unwrap(); + let snap_1 = dynamic_filter + .snapshot(Some(Arc::clone(&filter_schema_1))) + .unwrap() + .unwrap(); insta::assert_snapshot!(format!("{snap_1:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) }, fail_on_overflow: false }"#); - let dynamic_filter_2 = dynamic_filter.with_schema(Arc::clone(&filter_schema_2)); - let snap_2 = dynamic_filter_2.snapshot().unwrap().unwrap(); + let snap_2 = dynamic_filter + .snapshot(Some(Arc::clone(&filter_schema_2))) + .unwrap() + .unwrap(); insta::assert_snapshot!(format!("{snap_2:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 1 }, op: Eq, right: Literal { value: Int32(42) }, fail_on_overflow: false }"#); // Both filters allow evaluating the same expression let batch_1 = RecordBatch::try_new( @@ -363,11 +361,19 @@ mod test { lit(43) as Arc, )) as PhysicalExprRef; - dynamic_filter.with_new_children(vec![new_expr]) + let dynamic_filter = dynamic_filter + .with_new_children(vec![new_expr]) .expect("Failed to update children"); + // dynamic_filter.update(new_expr); - let snap_1 = dynamic_filter_1.snapshot().unwrap().unwrap(); - let snap_2 = dynamic_filter_2.snapshot().unwrap().unwrap(); + let snap_1 = dynamic_filter + .snapshot(Some(Arc::clone(&filter_schema_1))) + .unwrap() + .unwrap(); + let snap_2 = dynamic_filter + .snapshot(Some(Arc::clone(&filter_schema_2))) + .unwrap() + .unwrap(); // Now we should be able to evaluate the new expression on both batches let result_1 = snap_1.evaluate(&batch_1).unwrap(); @@ -393,23 +399,24 @@ mod test { let dynamic_filter = DynamicFilterPhysicalExpr::new(Arc::clone(&expr)); // Take a snapshot of the current expression - let snapshot = dynamic_filter.snapshot().unwrap().unwrap(); + let snapshot = dynamic_filter.snapshot(None).unwrap().unwrap(); assert_eq!(&snapshot, &expr); // Update the current expression let new_expr = lit(100) as Arc; - // dynamic_filter.with_new_children(vec![new_expr.clone()]) - // .expect("Failed to update expression"); - dynamic_filter.update(Arc::clone(&new_expr)); + let df = Arc::new(dynamic_filter) as PhysicalExprRef; + let df = df + .with_new_children(vec![new_expr.clone()]) + .expect("Failed to update expression"); + // dynamic_filter.update(Arc::clone(&new_expr)); // Take another snapshot - let snapshot = dynamic_filter.snapshot().unwrap().unwrap(); + let snapshot = df.snapshot(None).unwrap().unwrap(); assert_eq!(&snapshot, &new_expr); } #[test] fn test_dynamic_filter_physical_expr_misbehaves_data_type_nullable() { - let dynamic_filter = - DynamicFilterPhysicalExpr::new(lit(42)); + let mut dynamic_filter = DynamicFilterPhysicalExpr::new(lit(42)); // First call to data_type and nullable should set the initial values. let initial_data_type = dynamic_filter.data_type(&Schema::empty()).unwrap(); @@ -428,8 +435,7 @@ mod test { ); // Now change the current expression to something else. - dynamic_filter - .update(lit(ScalarValue::Utf8(None))); + dynamic_filter.update(lit(ScalarValue::Utf8(None))); // Check that we error if we call data_type, nullable or evaluate after changing the expression. assert!( dynamic_filter.data_type(&Schema::empty()).is_err(), @@ -441,10 +447,8 @@ mod test { ); let batch = RecordBatch::new_empty(Arc::new(Schema::empty())); - let snap = dynamic_filter.snapshot().unwrap().unwrap(); + let snap = dynamic_filter.snapshot(None).unwrap().unwrap(); // this is changed to ok, but makes sense - assert!( - snap.evaluate(&batch).is_ok(), - ); + assert!(snap.evaluate(&batch).is_ok(),); } }