Skip to content

Commit 2e8e07c

Browse files
committed
fix plan to query_graph
1 parent ee1e17b commit 2e8e07c

File tree

2 files changed

+113
-4
lines changed

2 files changed

+113
-4
lines changed

datafusion/optimizer/src/reorder_join/left_deep_join_plan.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -665,14 +665,27 @@ mod tests {
665665
.limit(0, Some(100))?
666666
.build()?;
667667

668-
let query_graph = QueryGraph::try_from(plan).unwrap();
668+
// Extract the join subtree and wrappers
669+
let (join_subtree, wrappers) =
670+
crate::reorder_join::query_graph::extract_join_subtree(plan).unwrap();
669671

670-
let optimized_plan =
672+
// Convert join subtree to query graph
673+
let query_graph = QueryGraph::try_from(join_subtree).unwrap();
674+
675+
// Optimize the joins
676+
let optimized_joins =
671677
optimal_left_deep_join_plan(query_graph, Rc::new(TestCostEstimator)).unwrap();
672678

679+
// Reconstruct the full plan with wrappers
680+
let optimized_plan =
681+
crate::reorder_join::query_graph::reconstruct_plan(optimized_joins, wrappers)
682+
.unwrap();
683+
684+
println!("Optimized Plan:");
673685
println!("{}", optimized_plan.display_indent());
674686

675-
panic!();
687+
// Verify the plan structure
688+
assert!(matches!(optimized_plan, LogicalPlan::Limit(_)));
676689

677690
Ok(())
678691
}

datafusion/optimizer/src/reorder_join/query_graph.rs

Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,12 +155,108 @@ impl QueryGraph {
155155
}
156156
}
157157

158+
/// Extracts the join subtree from a logical plan, separating it from wrapper operators.
159+
///
160+
/// This function traverses the plan tree from the root downward, collecting all non-join
161+
/// operators until it finds the topmost join node. The join subtree (all consecutive joins)
162+
/// is extracted and returned separately from the wrapper operators.
163+
///
164+
/// # Arguments
165+
///
166+
/// * `plan` - The logical plan to extract from
167+
///
168+
/// # Returns
169+
///
170+
/// Returns a tuple of (join_subtree, wrapper_operators) where:
171+
/// - `join_subtree` is the topmost join and all joins beneath it
172+
/// - `wrapper_operators` is a vector of non-join operators above the joins, in order from root to join
173+
///
174+
/// # Errors
175+
///
176+
/// Returns an error if the plan doesn't contain any joins.
177+
pub(crate) fn extract_join_subtree(
178+
plan: LogicalPlan,
179+
) -> Result<(LogicalPlan, Vec<LogicalPlan>)> {
180+
let mut wrappers = Vec::new();
181+
let mut current = plan;
182+
183+
// Descend through non-join nodes until we find a join
184+
loop {
185+
match current {
186+
LogicalPlan::Join(_) => {
187+
// Found the join subtree root
188+
return Ok((current, wrappers));
189+
}
190+
other => {
191+
// Check if this node contains joins in its children
192+
if !contains_join(&other) {
193+
return plan_err!(
194+
"Plan does not contain any join nodes: {}",
195+
other.display()
196+
);
197+
}
198+
199+
// This node is a wrapper - store it and descend to its child
200+
// For now, we only support single-child wrappers (Filter, Sort, Limit, Aggregate, etc.)
201+
let inputs = other.inputs();
202+
if inputs.len() != 1 {
203+
return plan_err!(
204+
"Join extraction only supports single-input operators, found {} inputs in: {}",
205+
inputs.len(),
206+
other.display()
207+
);
208+
}
209+
210+
wrappers.push(other.clone());
211+
current = (*inputs[0]).clone();
212+
}
213+
}
214+
}
215+
}
216+
217+
/// Reconstructs a logical plan by wrapping an optimized join plan with the original wrapper operators.
218+
///
219+
/// This function takes an optimized join plan and re-applies the wrapper operators (Filter, Sort,
220+
/// Aggregate, etc.) that were removed during extraction. The wrappers are applied in reverse order
221+
/// (innermost to outermost) to reconstruct the original plan structure.
222+
///
223+
/// # Arguments
224+
///
225+
/// * `join_plan` - The optimized join plan to wrap
226+
/// * `wrappers` - Vector of wrapper operators in order from outermost to innermost (root to join)
227+
///
228+
/// # Returns
229+
///
230+
/// Returns the fully reconstructed logical plan with all wrapper operators reapplied.
231+
///
232+
/// # Errors
233+
///
234+
/// Returns an error if reconstructing any wrapper operator fails.
235+
pub(crate) fn reconstruct_plan(
236+
join_plan: LogicalPlan,
237+
wrappers: Vec<LogicalPlan>,
238+
) -> Result<LogicalPlan> {
239+
let mut current = join_plan;
240+
241+
// Apply wrappers in reverse order (from innermost to outermost)
242+
for wrapper in wrappers.into_iter().rev() {
243+
// Use with_new_exprs to reconstruct the wrapper with the new input
244+
current = wrapper.with_new_exprs(wrapper.expressions(), vec![current])?;
245+
}
246+
247+
Ok(current)
248+
}
249+
158250
impl TryFrom<LogicalPlan> for QueryGraph {
159251
type Error = DataFusionError;
160252

161253
fn try_from(value: LogicalPlan) -> Result<Self, Self::Error> {
254+
// First, extract the join subtree from any wrapper operators
255+
let (join_subtree, _wrappers) = extract_join_subtree(value)?;
256+
257+
// Now convert only the join subtree to a query graph
162258
let mut query_graph = QueryGraph::new();
163-
flatten_joins_recursive(value, &mut query_graph)?;
259+
flatten_joins_recursive(join_subtree, &mut query_graph)?;
164260
Ok(query_graph)
165261
}
166262
}

0 commit comments

Comments
 (0)