Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 14 additions & 24 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,43 +582,33 @@ impl protobuf::PhysicalPlanNode {
) -> Result<Arc<dyn ExecutionPlan>> {
let input: Arc<dyn ExecutionPlan> =
into_physical_plan(&filter.input, ctx, runtime, extension_codec)?;
let projection = if !filter.projection.is_empty() {
Some(
filter
.projection
.iter()
.map(|i| *i as usize)
.collect::<Vec<_>>(),
)
} else {
None
};

// Use the projected schema if projection is present, otherwise use the full schema
let predicate_schema = if let Some(ref proj_indices) = projection {
// Create projected schema for parsing the predicate
let projected_fields: Vec<_> = proj_indices
.iter()
.map(|&i| input.schema().field(i).clone())
.collect();
Arc::new(Schema::new(projected_fields))
} else {
input.schema()
};

let predicate = filter
.expr
.as_ref()
.map(|expr| {
parse_physical_expr(expr, ctx, predicate_schema.as_ref(), extension_codec)
parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the input schema always matches my understanding of how the FilterExec work 👍

})
.transpose()?
.ok_or_else(|| {
DataFusionError::Internal(
"filter (FilterExecNode) in PhysicalPlanNode is missing.".to_owned(),
)
})?;

let filter_selectivity = filter.default_filter_selectivity.try_into();
let projection = if !filter.projection.is_empty() {
Some(
filter
.projection
.iter()
.map(|i| *i as usize)
.collect::<Vec<_>>(),
)
} else {
None
};

let filter =
FilterExec::try_new(predicate, input)?.with_projection(projection)?;
match filter_selectivity {
Expand Down
34 changes: 33 additions & 1 deletion datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2057,6 +2057,39 @@ async fn test_round_trip_date_part_display() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn test_tpch_part_in_list_query_with_real_parquet_data() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified this test covers the code as it fails without the fix



---- cases::roundtrip_physical_plan::test_tpch_part_in_list_query_with_real_parquet_data stdout ----
Error: Internal("PhysicalExpr Column references column 'p_size' at index 1 (zero-based) but input schema only has 1 columns: [\"p_size\"]")

use datafusion_common::test_util::datafusion_test_data;

let ctx = SessionContext::new();

// Register the TPC-H part table using the local test data
let test_data = datafusion_test_data();
let table_sql = format!(
"CREATE EXTERNAL TABLE part STORED AS PARQUET LOCATION '{test_data}/tpch_part_small.parquet'"
);
ctx.sql(&table_sql).await.map_err(|e| {
DataFusionError::External(format!("Failed to create part table: {e}").into())
})?;

// Test the exact problematic query
let sql =
"SELECT p_size FROM part WHERE p_size IN (14, 6, 5, 31) and p_partkey > 1000";

let logical_plan = ctx.sql(sql).await?.into_unoptimized_plan();
let optimized_plan = ctx.state().optimize(&logical_plan)?;
let physical_plan = ctx.state().create_physical_plan(&optimized_plan).await?;

// Serialize the physical plan - bug may happen here already but not necessarily manifests
let codec = DefaultPhysicalExtensionCodec {};
let proto = PhysicalPlanNode::try_from_physical_plan(physical_plan.clone(), &codec)?;

// This will fail with the bug, but should succeed when fixed
let _deserialized_plan =
proto.try_into_physical_plan(&ctx, ctx.runtime_env().as_ref(), &codec)?;
Ok(())
}

#[tokio::test]
/// Tests that we can serialize an unoptimized "analyze" plan and it will work on the other end
async fn analyze_roundtrip_unoptimized() -> Result<()> {
Expand Down Expand Up @@ -2090,6 +2123,5 @@ async fn analyze_roundtrip_unoptimized() -> Result<()> {
let physical_planner =
datafusion::physical_planner::DefaultPhysicalPlanner::default();
physical_planner.optimize_physical_plan(unoptimized, &session_state, |_, _| {})?;

Ok(())
}