diff --git a/datafusion/execution/src/disk_manager.rs b/datafusion/execution/src/disk_manager.rs index 82f2d75ac1b5..67251088af12 100644 --- a/datafusion/execution/src/disk_manager.rs +++ b/datafusion/execution/src/disk_manager.rs @@ -26,7 +26,7 @@ use rand::{rng, Rng}; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use tempfile::{Builder, NamedTempFile, TempDir}; +use tempfile::{Builder, NamedTempFile, TempDir, TempPath}; use crate::memory_pool::human_readable_size; @@ -370,6 +370,17 @@ impl RefCountedTempFile { pub fn current_disk_usage(&self) -> u64 { self.current_file_disk_usage } + + pub fn clone_refcounted(&self) -> Result { + let reopened = std::fs::File::open(self.path())?; + let temp_path = TempPath::from_path(self.path()); + Ok(Self { + _parent_temp_dir: Arc::clone(&self._parent_temp_dir), + tempfile: NamedTempFile::from_parts(reopened, temp_path), + current_file_disk_usage: self.current_file_disk_usage, + disk_manager: Arc::clone(&self.disk_manager), + }) + } } /// When the temporary file is dropped, subtract its disk usage from the disk manager's total diff --git a/datafusion/physical-plan/src/joins/grace_hash_join/exec.rs b/datafusion/physical-plan/src/joins/grace_hash_join/exec.rs new file mode 100644 index 000000000000..2c9482f93f89 --- /dev/null +++ b/datafusion/physical-plan/src/joins/grace_hash_join/exec.rs @@ -0,0 +1,1239 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::execution_plan::{boundedness_from_children, EmissionType}; +use crate::filter_pushdown::{ + ChildPushdownResult, FilterDescription, FilterPushdownPhase, + FilterPushdownPropagation, +}; +use crate::joins::utils::{ + reorder_output_after_swap, swap_join_projection, OnceFut, +}; +use crate::joins::{JoinOn, JoinOnRef, PartitionMode}; +use crate::projection::{ + try_embed_projection, try_pushdown_through_join, EmbeddedProjection, JoinData, + ProjectionExec, +}; +use crate::spill::get_record_batch_memory_size; +use crate::{ + common::can_project, + joins::utils::{ + build_join_schema, check_join_is_valid, estimate_join_statistics, + symmetric_join_output_partitioning, + BuildProbeJoinMetrics, ColumnIndex, JoinFilter, + }, + metrics::{ExecutionPlanMetricsSet, MetricsSet}, + DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, + PlanProperties, SendableRecordBatchStream, Statistics, +}; +use crate::{ExecutionPlanProperties, SpillManager}; +use std::fmt; +use std::fmt::Formatter; +use std::sync::{Arc, OnceLock}; +use std::{any::Any, vec}; + +use arrow::array::UInt32Array; +use arrow::compute::{concat_batches, take}; +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use datafusion_common::config::ConfigOptions; +use datafusion_common::{ + internal_err, plan_err, project_schema, JoinSide, JoinType, + NullEquality, Result, +}; +use datafusion_execution::TaskContext; +use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator}; +use datafusion_physical_expr::equivalence::{ + join_equivalence_properties, ProjectionMapping, +}; +use datafusion_physical_expr::expressions::{lit, DynamicFilterPhysicalExpr}; +use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; + +use crate::joins::grace_hash_join::stream::{ + GraceAccumulator, GraceHashJoinStream, SpillFut, +}; +use crate::joins::hash_join::shared_bounds::SharedBoundsAccumulator; +use crate::metrics::SpillMetrics; +use crate::spill::spill_manager::SpillLocation; +use ahash::RandomState; +use datafusion_common::hash_utils::create_hashes; +use datafusion_physical_expr_common::physical_expr::fmt_sql; +use futures::StreamExt; + +/// Hard-coded seed to ensure hash values from the hash join differ from `RepartitionExec`, avoiding collisions. +const HASH_JOIN_SEED: RandomState = + RandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64); + +pub struct GraceHashJoinExec { + /// left (build) side which gets hashed + pub left: Arc, + /// right (probe) side which are filtered by the hash table + pub right: Arc, + /// Set of equijoin columns from the relations: `(left_col, right_col)` + pub on: Vec<(PhysicalExprRef, PhysicalExprRef)>, + /// Filters which are applied while finding matching rows + pub filter: Option, + /// How the join is performed (`OUTER`, `INNER`, etc) + pub join_type: JoinType, + /// The schema after join. Please be careful when using this schema, + /// if there is a projection, the schema isn't the same as the output schema. + join_schema: SchemaRef, + /// Shared the `RandomState` for the hashing algorithm + random_state: RandomState, + /// Execution metrics + metrics: ExecutionPlanMetricsSet, + /// The projection indices of the columns in the output schema of join + pub projection: Option>, + /// Information of index and left / right placement of columns + column_indices: Vec, + /// The equality null-handling behavior of the join algorithm. + pub null_equality: NullEquality, + /// Cache holding plan properties like equivalences, output partitioning etc. + cache: PlanProperties, + /// Dynamic filter for pushing down to the probe side + /// Set when dynamic filter pushdown is detected in handle_child_pushdown_result. + /// HashJoinExec also needs to keep a shared bounds accumulator for coordinating updates. + dynamic_filter: Option, + accumulator: Arc, +} + +#[derive(Clone)] +struct HashJoinExecDynamicFilter { + /// Dynamic filter that we'll update with the results of the build side once that is done. + filter: Arc, + /// Bounds accumulator to keep track of the min/max bounds on the join keys for each partition. + /// It is lazily initialized during execution to make sure we use the actual execution time partition counts. + bounds_accumulator: OnceLock>, +} + +impl fmt::Debug for GraceHashJoinExec { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("HashJoinExec") + .field("left", &self.left) + .field("right", &self.right) + .field("on", &self.on) + .field("filter", &self.filter) + .field("join_type", &self.join_type) + .field("join_schema", &self.join_schema) + .field("random_state", &self.random_state) + .field("metrics", &self.metrics) + .field("projection", &self.projection) + .field("column_indices", &self.column_indices) + .field("null_equality", &self.null_equality) + .field("cache", &self.cache) + // Explicitly exclude dynamic_filter to avoid runtime state differences in tests + .finish() + } +} + +impl EmbeddedProjection for GraceHashJoinExec { + fn with_projection(&self, projection: Option>) -> Result { + self.with_projection(projection) + } +} + +impl GraceHashJoinExec { + /// Tries to create a new [GraceHashJoinExec]. + /// + /// # Error + /// This function errors when it is not possible to join the left and right sides on keys `on`. + #[allow(clippy::too_many_arguments)] + pub fn try_new( + left: Arc, + right: Arc, + on: JoinOn, + filter: Option, + join_type: &JoinType, + projection: Option>, + null_equality: NullEquality, + ) -> Result { + let left_schema = left.schema(); + let right_schema = right.schema(); + if on.is_empty() { + return plan_err!("On constraints in HashJoinExec should be non-empty"); + } + check_join_is_valid(&left_schema, &right_schema, &on)?; + + let (join_schema, column_indices) = + build_join_schema(&left_schema, &right_schema, join_type); + + let random_state = HASH_JOIN_SEED; + + let join_schema = Arc::new(join_schema); + + // check if the projection is valid + can_project(&join_schema, projection.as_ref())?; + + let cache = Self::compute_properties( + &left, + &right, + Arc::clone(&join_schema), + *join_type, + &on, + projection.as_ref(), + )?; + let partitions = left.output_partitioning().partition_count(); + let accumulator = GraceAccumulator::new(partitions); + + let metrics = ExecutionPlanMetricsSet::new(); + // Initialize both dynamic filter and bounds accumulator to None + // They will be set later if dynamic filtering is enabled + Ok(GraceHashJoinExec { + left, + right, + on, + filter, + join_type: *join_type, + join_schema, + random_state, + metrics, + projection, + column_indices, + null_equality, + cache, + dynamic_filter: None, + accumulator, + }) + } + + fn create_dynamic_filter(on: &JoinOn) -> Arc { + // Extract the right-side keys (probe side keys) from the `on` clauses + // Dynamic filter will be created from build side values (left side) and applied to probe side (right side) + let right_keys: Vec<_> = on.iter().map(|(_, r)| Arc::clone(r)).collect(); + // Initialize with a placeholder expression (true) that will be updated when the hash table is built + Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true))) + } + + /// left (build) side which gets hashed + pub fn left(&self) -> &Arc { + &self.left + } + + /// right (probe) side which are filtered by the hash table + pub fn right(&self) -> &Arc { + &self.right + } + + /// Set of common columns used to join on + pub fn on(&self) -> &[(PhysicalExprRef, PhysicalExprRef)] { + &self.on + } + + /// Filters applied before join output + pub fn filter(&self) -> Option<&JoinFilter> { + self.filter.as_ref() + } + + /// How the join is performed + pub fn join_type(&self) -> &JoinType { + &self.join_type + } + + /// The schema after join. Please be careful when using this schema, + /// if there is a projection, the schema isn't the same as the output schema. + pub fn join_schema(&self) -> &SchemaRef { + &self.join_schema + } + + /// Get null_equality + pub fn null_equality(&self) -> NullEquality { + self.null_equality + } + + /// Calculate order preservation flags for this hash join. + fn maintains_input_order(join_type: JoinType) -> Vec { + vec![ + false, + matches!( + join_type, + JoinType::Inner + | JoinType::Right + | JoinType::RightAnti + | JoinType::RightSemi + | JoinType::RightMark + ), + ] + } + + /// Get probe side information for the hash join. + pub fn probe_side() -> JoinSide { + // In current implementation right side is always probe side. + JoinSide::Right + } + + /// Return whether the join contains a projection + pub fn contains_projection(&self) -> bool { + self.projection.is_some() + } + + /// Return new instance of [HashJoinExec] with the given projection. + pub fn with_projection(&self, projection: Option>) -> Result { + // check if the projection is valid + can_project(&self.schema(), projection.as_ref())?; + let projection = match projection { + Some(projection) => match &self.projection { + Some(p) => Some(projection.iter().map(|i| p[*i]).collect()), + None => Some(projection), + }, + None => None, + }; + Self::try_new( + Arc::clone(&self.left), + Arc::clone(&self.right), + self.on.clone(), + self.filter.clone(), + &self.join_type, + projection, + self.null_equality, + ) + } + + /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. + fn compute_properties( + left: &Arc, + right: &Arc, + schema: SchemaRef, + join_type: JoinType, + on: JoinOnRef, + projection: Option<&Vec>, + ) -> Result { + // Calculate equivalence properties: + let mut eq_properties = join_equivalence_properties( + left.equivalence_properties().clone(), + right.equivalence_properties().clone(), + &join_type, + Arc::clone(&schema), + &Self::maintains_input_order(join_type), + Some(Self::probe_side()), + on, + )?; + + let mut output_partitioning = + symmetric_join_output_partitioning(left, right, &join_type)?; + let emission_type = if left.boundedness().is_unbounded() { + EmissionType::Final + } else if right.pipeline_behavior() == EmissionType::Incremental { + match join_type { + // If we only need to generate matched rows from the probe side, + // we can emit rows incrementally. + JoinType::Inner + | JoinType::LeftSemi + | JoinType::RightSemi + | JoinType::Right + | JoinType::RightAnti + | JoinType::RightMark => EmissionType::Incremental, + // If we need to generate unmatched rows from the *build side*, + // we need to emit them at the end. + JoinType::Left + | JoinType::LeftAnti + | JoinType::LeftMark + | JoinType::Full => EmissionType::Both, + } + } else { + right.pipeline_behavior() + }; + + // If contains projection, update the PlanProperties. + if let Some(projection) = projection { + // construct a map from the input expressions to the output expression of the Projection + let projection_mapping = + ProjectionMapping::from_indices(projection, &schema)?; + let out_schema = project_schema(&schema, Some(projection))?; + output_partitioning = + output_partitioning.project(&projection_mapping, &eq_properties); + eq_properties = eq_properties.project(&projection_mapping, out_schema); + } + + Ok(PlanProperties::new( + eq_properties, + output_partitioning, + emission_type, + boundedness_from_children([left, right]), + )) + } + + /// Returns a new `ExecutionPlan` that computes the same join as this one, + /// with the left and right inputs swapped using the specified + /// `partition_mode`. + /// + /// # Notes: + /// + /// This function is public so other downstream projects can use it to + /// construct `HashJoinExec` with right side as the build side. + /// + /// For using this interface directly, please refer to below: + /// + /// Hash join execution may require specific input partitioning (for example, + /// the left child may have a single partition while the right child has multiple). + /// + /// Calling this function on join nodes whose children have already been repartitioned + /// (e.g., after a `RepartitionExec` has been inserted) may break the partitioning + /// requirements of the hash join. Therefore, ensure you call this function + /// before inserting any repartitioning operators on the join's children. + /// + /// In DataFusion's default SQL interface, this function is used by the `JoinSelection` + /// physical optimizer rule to determine a good join order, which is + /// executed before the `EnforceDistribution` rule (the rule that may + /// insert `RepartitionExec` operators). + pub fn swap_inputs( + &self, + _partition_mode: PartitionMode, + ) -> Result> { + let left = self.left(); + let right = self.right(); + let new_join = GraceHashJoinExec::try_new( + Arc::clone(right), + Arc::clone(left), + self.on() + .iter() + .map(|(l, r)| (Arc::clone(r), Arc::clone(l))) + .collect(), + self.filter().map(JoinFilter::swap), + &self.join_type().swap(), + swap_join_projection( + left.schema().fields().len(), + right.schema().fields().len(), + self.projection.as_ref(), + self.join_type(), + ), + self.null_equality(), + )?; + // In case of anti / semi joins or if there is embedded projection in HashJoinExec, output column order is preserved, no need to add projection again + if matches!( + self.join_type(), + JoinType::LeftSemi + | JoinType::RightSemi + | JoinType::LeftAnti + | JoinType::RightAnti + ) || self.projection.is_some() + { + Ok(Arc::new(new_join)) + } else { + reorder_output_after_swap(Arc::new(new_join), &left.schema(), &right.schema()) + } + } +} + +impl DisplayAs for GraceHashJoinExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let display_filter = self.filter.as_ref().map_or_else( + || "".to_string(), + |f| format!(", filter={}", f.expression()), + ); + let display_projections = if self.contains_projection() { + format!( + ", projection=[{}]", + self.projection + .as_ref() + .unwrap() + .iter() + .map(|index| format!( + "{}@{}", + self.join_schema.fields().get(*index).unwrap().name(), + index + )) + .collect::>() + .join(", ") + ) + } else { + "".to_string() + }; + let on = self + .on + .iter() + .map(|(c1, c2)| format!("({c1}, {c2})")) + .collect::>() + .join(", "); + write!( + f, + "GraceHashJoinExec: join_type={:?}, on=[{}]{}{}", + self.join_type, on, display_filter, display_projections, + ) + } + DisplayFormatType::TreeRender => { + let on = self + .on + .iter() + .map(|(c1, c2)| { + format!("({} = {})", fmt_sql(c1.as_ref()), fmt_sql(c2.as_ref())) + }) + .collect::>() + .join(", "); + + if *self.join_type() != JoinType::Inner { + writeln!(f, "join_type={:?}", self.join_type)?; + } + + writeln!(f, "on={on}")?; + + if let Some(filter) = self.filter.as_ref() { + writeln!(f, "filter={filter}")?; + } + + Ok(()) + } + } + } +} + +impl ExecutionPlan for GraceHashJoinExec { + fn name(&self) -> &'static str { + "GraceHashJoinExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + + fn required_input_distribution(&self) -> Vec { + let (left_expr, right_expr) = self + .on + .iter() + .map(|(l, r)| (Arc::clone(l), Arc::clone(r))) + .unzip(); + vec![ + Distribution::HashPartitioned(left_expr), + Distribution::HashPartitioned(right_expr), + ] + } + + // For [JoinType::Inner] and [JoinType::RightSemi] in hash joins, the probe phase initiates by + // applying the hash function to convert the join key(s) in each row into a hash value from the + // probe side table in the order they're arranged. The hash value is used to look up corresponding + // entries in the hash table that was constructed from the build side table during the build phase. + // + // Because of the immediate generation of result rows once a match is found, + // the output of the join tends to follow the order in which the rows were read from + // the probe side table. This is simply due to the sequence in which the rows were processed. + // Hence, it appears that the hash join is preserving the order of the probe side. + // + // Meanwhile, in the case of a [JoinType::RightAnti] hash join, + // the unmatched rows from the probe side are also kept in order. + // This is because the **`RightAnti`** join is designed to return rows from the right + // (probe side) table that have no match in the left (build side) table. Because the rows + // are processed sequentially in the probe phase, and unmatched rows are directly output + // as results, these results tend to retain the order of the probe side table. + fn maintains_input_order(&self) -> Vec { + Self::maintains_input_order(self.join_type) + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.left, &self.right] + } + + /// Creates a new HashJoinExec with different children while preserving configuration. + /// + /// This method is called during query optimization when the optimizer creates new + /// plan nodes. Importantly, it creates a fresh bounds_accumulator via `try_new` + /// rather than cloning the existing one because partitioning may have changed. + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(GraceHashJoinExec { + left: Arc::clone(&children[0]), + right: Arc::clone(&children[1]), + on: self.on.clone(), + filter: self.filter.clone(), + join_type: self.join_type, + join_schema: Arc::clone(&self.join_schema), + random_state: self.random_state.clone(), + metrics: ExecutionPlanMetricsSet::new(), + projection: self.projection.clone(), + column_indices: self.column_indices.clone(), + null_equality: self.null_equality, + cache: Self::compute_properties( + &children[0], + &children[1], + Arc::clone(&self.join_schema), + self.join_type, + &self.on, + self.projection.as_ref(), + )?, + // Keep the dynamic filter, bounds accumulator will be reset + dynamic_filter: self.dynamic_filter.clone(), + accumulator: Arc::clone(&self.accumulator), + })) + } + + fn reset_state(self: Arc) -> Result> { + Ok(Arc::new(GraceHashJoinExec { + left: Arc::clone(&self.left), + right: Arc::clone(&self.right), + on: self.on.clone(), + filter: self.filter.clone(), + join_type: self.join_type, + join_schema: Arc::clone(&self.join_schema), + random_state: self.random_state.clone(), + metrics: ExecutionPlanMetricsSet::new(), + projection: self.projection.clone(), + column_indices: self.column_indices.clone(), + null_equality: self.null_equality, + cache: self.cache.clone(), + // Reset dynamic filter and bounds accumulator to initial state + dynamic_filter: None, + accumulator: Arc::clone(&self.accumulator), + })) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + let left_partitions = self.left.output_partitioning().partition_count(); + let right_partitions = self.right.output_partitioning().partition_count(); + + if left_partitions != right_partitions { + return internal_err!( + "Invalid GraceHashJoinExec, partition count mismatch {left_partitions}!={right_partitions},\ + consider using RepartitionExec" + ); + } + + let enable_dynamic_filter_pushdown = self.dynamic_filter.is_some(); + + let join_metrics = Arc::new(BuildProbeJoinMetrics::new(partition, &self.metrics)); + + let left = self.left.execute(partition, Arc::clone(&context))?; + let left_schema = Arc::clone(&self.left.schema()); + let on_left = self + .on + .iter() + .map(|(left_expr, _)| Arc::clone(left_expr)) + .collect::>(); + + let right = self.right.execute(partition, Arc::clone(&context))?; + let right_schema = Arc::clone(&self.right.schema()); + let on_right = self + .on + .iter() + .map(|(_, right_expr)| Arc::clone(right_expr)) + .collect::>(); + + let spill_left = Arc::new(SpillManager::new( + Arc::clone(&context.runtime_env()), + SpillMetrics::new(&self.metrics, partition), + Arc::clone(&left_schema), + )); + let spill_right = Arc::new(SpillManager::new( + Arc::clone(&context.runtime_env()), + SpillMetrics::new(&self.metrics, partition), + Arc::clone(&right_schema), + )); + + // update column indices to reflect the projection + let column_indices_after_projection = match &self.projection { + Some(projection) => projection + .iter() + .map(|i| self.column_indices[*i].clone()) + .collect(), + None => self.column_indices.clone(), + }; + + let random_state = self.random_state.clone(); + let on = self.on.clone(); + let spill_left_clone = Arc::clone(&spill_left); + let spill_right_clone = Arc::clone(&spill_right); + let accumulator_clone = Arc::clone(&self.accumulator); + let join_metrics_clone = Arc::clone(&join_metrics); + let spill_fut = OnceFut::new(async move { + let (left_idx, right_idx) = partition_and_spill( + random_state, + on, + left, + right, + join_metrics_clone, + enable_dynamic_filter_pushdown, + left_partitions, + spill_left_clone, + spill_right_clone, + partition, + ) + .await?; + accumulator_clone + .report_partition(partition, left_idx.clone(), right_idx.clone()) + .await; + Ok(SpillFut::new(partition, left_idx, right_idx)) + }); + + Ok(Box::pin(GraceHashJoinStream::new( + self.schema(), + spill_fut, + spill_left, + spill_right, + on_left, + on_right, + self.projection.clone(), + self.filter.clone(), + self.join_type, + column_indices_after_projection, + join_metrics, + context, + Arc::clone(&self.accumulator), + ))) + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + + fn statistics(&self) -> Result { + self.partition_statistics(None) + } + + fn partition_statistics(&self, partition: Option) -> Result { + if partition.is_some() { + return Ok(Statistics::new_unknown(&self.schema())); + } + let stats = estimate_join_statistics( + self.left.partition_statistics(None)?, + self.right.partition_statistics(None)?, + self.on.clone(), + &self.join_type, + &self.join_schema, + )?; + // Project statistics if there is a projection + Ok(stats.project(self.projection.as_ref())) + } + + /// Tries to push `projection` down through `hash_join`. If possible, performs the + /// pushdown and returns a new [`HashJoinExec`] as the top plan which has projections + /// as its children. Otherwise, returns `None`. + fn try_swapping_with_projection( + &self, + projection: &ProjectionExec, + ) -> Result>> { + // TODO: currently if there is projection in GraceHashJoinExec, we can't push down projection to left or right input. Maybe we can pushdown the mixed projection later. + if self.contains_projection() { + return Ok(None); + } + + if let Some(JoinData { + projected_left_child, + projected_right_child, + join_filter, + join_on, + }) = try_pushdown_through_join( + projection, + self.left(), + self.right(), + self.on(), + self.schema(), + self.filter(), + )? { + Ok(Some(Arc::new(GraceHashJoinExec::try_new( + Arc::new(projected_left_child), + Arc::new(projected_right_child), + join_on, + join_filter, + self.join_type(), + // Returned early if projection is not None + None, + self.null_equality, + )?))) + } else { + try_embed_projection(projection, self) + } + } + + fn gather_filters_for_pushdown( + &self, + phase: FilterPushdownPhase, + parent_filters: Vec>, + config: &ConfigOptions, + ) -> Result { + // Other types of joins can support *some* filters, but restrictions are complex and error prone. + // For now we don't support them. + // See the logical optimizer rules for more details: datafusion/optimizer/src/push_down_filter.rs + // See https://github.com/apache/datafusion/issues/16973 for tracking. + if self.join_type != JoinType::Inner { + return Ok(FilterDescription::all_unsupported( + &parent_filters, + &self.children(), + )); + } + + // Get basic filter descriptions for both children + let left_child = crate::filter_pushdown::ChildFilterDescription::from_child( + &parent_filters, + self.left(), + )?; + let mut right_child = crate::filter_pushdown::ChildFilterDescription::from_child( + &parent_filters, + self.right(), + )?; + + // Add dynamic filters in Post phase if enabled + if matches!(phase, FilterPushdownPhase::Post) + && config.optimizer.enable_dynamic_filter_pushdown + { + // Add actual dynamic filter to right side (probe side) + let dynamic_filter = Self::create_dynamic_filter(&self.on); + right_child = right_child.with_self_filter(dynamic_filter); + } + + Ok(FilterDescription::new() + .with_child(left_child) + .with_child(right_child)) + } + + fn handle_child_pushdown_result( + &self, + _phase: FilterPushdownPhase, + child_pushdown_result: ChildPushdownResult, + _config: &ConfigOptions, + ) -> Result>> { + // Note: this check shouldn't be necessary because we already marked all parent filters as unsupported for + // non-inner joins in `gather_filters_for_pushdown`. + // However it's a cheap check and serves to inform future devs touching this function that they need to be really + // careful pushing down filters through non-inner joins. + if self.join_type != JoinType::Inner { + // Other types of joins can support *some* filters, but restrictions are complex and error prone. + // For now we don't support them. + // See the logical optimizer rules for more details: datafusion/optimizer/src/push_down_filter.rs + return Ok(FilterPushdownPropagation::all_unsupported( + child_pushdown_result, + )); + } + + let mut result = FilterPushdownPropagation::if_any(child_pushdown_result.clone()); + assert_eq!(child_pushdown_result.self_filters.len(), 2); // Should always be 2, we have 2 children + let right_child_self_filters = &child_pushdown_result.self_filters[1]; // We only push down filters to the right child + // We expect 0 or 1 self filters + if let Some(filter) = right_child_self_filters.first() { + // Note that we don't check PushdDownPredicate::discrimnant because even if nothing said + // "yes, I can fully evaluate this filter" things might still use it for statistics -> it's worth updating + let predicate = Arc::clone(&filter.predicate); + if let Ok(dynamic_filter) = + Arc::downcast::(predicate) + { + // We successfully pushed down our self filter - we need to make a new node with the dynamic filter + let new_node = Arc::new(GraceHashJoinExec { + left: Arc::clone(&self.left), + right: Arc::clone(&self.right), + on: self.on.clone(), + filter: self.filter.clone(), + join_type: self.join_type, + join_schema: Arc::clone(&self.join_schema), + random_state: self.random_state.clone(), + metrics: ExecutionPlanMetricsSet::new(), + projection: self.projection.clone(), + column_indices: self.column_indices.clone(), + null_equality: self.null_equality, + cache: self.cache.clone(), + dynamic_filter: Some(HashJoinExecDynamicFilter { + filter: dynamic_filter, + bounds_accumulator: OnceLock::new(), + }), + accumulator: Arc::clone(&self.accumulator), + }); + result = result.with_updated_node(new_node as Arc); + } + } + Ok(result) + } +} + + +#[allow(clippy::too_many_arguments)] +pub async fn partition_and_spill( + random_state: RandomState, + on: Vec<(PhysicalExprRef, PhysicalExprRef)>, + mut left_stream: SendableRecordBatchStream, + mut right_stream: SendableRecordBatchStream, + join_metrics: Arc, + enable_dynamic_filter_pushdown: bool, + partition_count: usize, + spill_left: Arc, + spill_right: Arc, + partition: usize, +) -> Result<(Vec, Vec)> { + let on_left: Vec<_> = on.iter().map(|(l, _)| Arc::clone(l)).collect(); + let on_right: Vec<_> = on.iter().map(|(_, r)| Arc::clone(r)).collect(); + + // LEFT side partitioning + let left_index = partition_and_spill_one_side( + &mut left_stream, + &on_left, + &random_state, + partition_count, + spill_left, + &format!("left_{partition}"), + &join_metrics, + enable_dynamic_filter_pushdown, + ) + .await?; + + // RIGHT side partitioning + let right_index = partition_and_spill_one_side( + &mut right_stream, + &on_right, + &random_state, + partition_count, + spill_right, + &format!("right_{partition}"), + &join_metrics, + enable_dynamic_filter_pushdown, + ) + .await?; + Ok((left_index, right_index)) +} + +#[allow(clippy::too_many_arguments)] +async fn partition_and_spill_one_side( + input: &mut SendableRecordBatchStream, + on_exprs: &[PhysicalExprRef], + random_state: &RandomState, + partition_count: usize, + spill_manager: Arc, + spilling_request_msg: &str, + join_metrics: &BuildProbeJoinMetrics, + _enable_dynamic_filter_pushdown: bool, +) -> Result> { + let mut partitions: Vec = (0..partition_count) + .map(|_| PartitionWriter::new(Arc::clone(&spill_manager))) + .collect(); + + let mut buffered_batches = Vec::new(); + + let schema = input.schema(); + while let Some(batch) = input.next().await { + let batch = batch?; + if batch.num_rows() == 0 { + continue; + } + join_metrics.build_input_batches.add(1); + join_metrics.build_input_rows.add(batch.num_rows()); + buffered_batches.push(batch); + } + if buffered_batches.is_empty() { + return Ok(Vec::new()); + } + // Create single batch to reduce number of spilled files + let single_batch = concat_batches(&schema, &buffered_batches)?; + let num_rows = single_batch.num_rows(); + if num_rows == 0 { + return Ok(Vec::new()); + } + + // Calculate hashes + let keys = on_exprs + .iter() + .map(|c| c.evaluate(&single_batch)?.into_array(num_rows)) + .collect::>>()?; + + let mut hashes = vec![0u64; num_rows]; + create_hashes(&keys, random_state, &mut hashes)?; + + // Spread to partitions + let mut indices: Vec> = vec![Vec::new(); partition_count]; + for (row, h) in hashes.iter().enumerate() { + let bucket = (*h as usize) % partition_count; + indices[bucket].push(row as u32); + } + + // Collect and spill + for (i, idxs) in indices.into_iter().enumerate() { + if idxs.is_empty() { + continue; + } + + let idx_array = UInt32Array::from(idxs); + let taken = single_batch + .columns() + .iter() + .map(|c| take(c.as_ref(), &idx_array, None)) + .collect::>>()?; + + let part_batch = RecordBatch::try_new(single_batch.schema(), taken)?; + // We need unique name for spilling + let request_msg = format!("grace_partition_{spilling_request_msg}_{i}"); + partitions[i].spill_batch_auto(&part_batch, &request_msg)?; + } + + // Prepare indexes + let mut result = Vec::with_capacity(partitions.len()); + for (i, writer) in partitions.into_iter().enumerate() { + result.push(writer.finish(i)?); + } + // println!("spill_manager {:?}", spill_manager.metrics); + Ok(result) +} + +#[derive(Debug)] +pub struct PartitionWriter { + spill_manager: Arc, + total_rows: usize, + total_bytes: usize, + chunks: Vec, +} + +impl PartitionWriter { + pub fn new(spill_manager: Arc) -> Self { + Self { + spill_manager, + total_rows: 0, + total_bytes: 0, + chunks: vec![], + } + } + + pub fn spill_batch_auto( + &mut self, + batch: &RecordBatch, + request_msg: &str, + ) -> Result<()> { + let loc = self.spill_manager.spill_batch_auto(batch, request_msg)?; + self.total_rows += batch.num_rows(); + self.total_bytes += get_record_batch_memory_size(batch); + self.chunks.push(loc); + Ok(()) + } + + pub fn finish(self, part_id: usize) -> Result { + Ok(PartitionIndex { + part_id, + chunks: self.chunks, + total_rows: self.total_rows, + total_bytes: self.total_bytes, + }) + } +} + +/// Describes a single partition of spilled data (used in GraceHashJoin). +/// +/// Each partition can consist of one or multiple chunks (batches) +/// that were spilled either to memory or to disk. +/// These chunks are later reloaded during the join phase. +/// +/// Example: +/// Partition 3 -> [ spill_chunk_3_0.arrow, spill_chunk_3_1.arrow ] +#[derive(Debug, Clone)] +pub struct PartitionIndex { + /// Unique partition identifier (0..N-1) + pub part_id: usize, + + /// Total number of rows in this partition + pub total_rows: usize, + + /// Total size in bytes of all batches in this partition + pub total_bytes: usize, + + /// Collection of spill locations (each corresponds to one batch written + /// by [`PartitionWriter::spill_batch_auto`]) + pub chunks: Vec, +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test::TestMemoryExec; + use crate::{ + common, expressions::Column, repartition::RepartitionExec, test::build_table_i32, + }; + + use crate::joins::HashJoinExec; + use arrow::array::{ArrayRef, Int32Array}; + use datafusion_execution::runtime_env::RuntimeEnvBuilder; + use datafusion_physical_expr::Partitioning; + use futures::future; + + fn build_large_table( + a_name: &str, + b_name: &str, + c_name: &str, + n: usize, + ) -> Arc { + let a: ArrayRef = Arc::new(Int32Array::from_iter_values(1..=n as i32)); + let b: ArrayRef = + Arc::new(Int32Array::from_iter_values((1..=n as i32).map(|x| x * 2))); + let c: ArrayRef = + Arc::new(Int32Array::from_iter_values((1..=n as i32).map(|x| x * 10))); + + let schema = Arc::new(arrow::datatypes::Schema::new(vec![ + arrow::datatypes::Field::new( + a_name, + arrow::datatypes::DataType::Int32, + false, + ), + arrow::datatypes::Field::new( + b_name, + arrow::datatypes::DataType::Int32, + false, + ), + arrow::datatypes::Field::new( + c_name, + arrow::datatypes::DataType::Int32, + false, + ), + ])); + + let batch = RecordBatch::try_new(Arc::clone(&schema), vec![a, b, c]).unwrap(); + Arc::new(TestMemoryExec::try_new(&[vec![batch]], schema, None).unwrap()) + } + + fn build_table( + a: (&str, &Vec), + b: (&str, &Vec), + c: (&str, &Vec), + ) -> Arc { + let batch = build_table_i32(a, b, c); + let schema = batch.schema(); + TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap() + } + + #[tokio::test] + async fn simple_grace_hash_join() -> Result<()> { + // let left = build_table( + // ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]), + // ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]), + // ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]), + // ); + // let right = build_table( + // ("a2", &vec![1, 2]), + // ("b2", &vec![1, 2]), + // ("c2", &vec![14, 15]), + // ); + let left = build_large_table("a1", "b1", "c1", 2000000); + let right = build_large_table("a2", "b2", "c2", 5000000); + let on = vec![( + Arc::new(Column::new_with_schema("a1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("b2", &right.schema())?) as _, + )]; + let (left_expr, right_expr): ( + Vec>, + Vec>, + ) = on + .iter() + .map(|(l, r)| (Arc::clone(l), Arc::clone(r))) + .unzip(); + let left_repartitioned: Arc = Arc::new( + RepartitionExec::try_new(left, Partitioning::Hash(left_expr, 32))?, + ); + let right_repartitioned: Arc = Arc::new( + RepartitionExec::try_new(right, Partitioning::Hash(right_expr, 32))?, + ); + + let runtime = RuntimeEnvBuilder::new() + .with_memory_limit(500_000_000, 1.0) + .build_arc()?; + let task_ctx = TaskContext::default().with_runtime(runtime); + let task_ctx = Arc::new(task_ctx); + + let join = GraceHashJoinExec::try_new( + Arc::clone(&left_repartitioned), + Arc::clone(&right_repartitioned), + on.clone(), + None, + &JoinType::Inner, + None, + NullEquality::NullEqualsNothing, + )?; + + let partition_count = right_repartitioned.output_partitioning().partition_count(); + let tasks: Vec<_> = (0..partition_count) + .map(|i| { + let ctx = Arc::clone(&task_ctx); + let s = join.execute(i, ctx).unwrap(); + async move { common::collect(s).await } + }) + .collect(); + + let results = future::join_all(tasks).await; + let mut batches = Vec::new(); + for r in results { + let mut v = r?; + v.retain(|b| b.num_rows() > 0); + batches.extend(v); + } + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + println!("TOTAL ROWS = {}", total_rows); + + // print_batches(&*batches).unwrap(); + // Asserting that operator-level reservation attempting to overallocate + // assert_contains!( + // err.to_string(), + // "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n HashJoinInput" + // ); + // + // assert_contains!( + // err.to_string(), + // "Failed to allocate additional 120.0 B for HashJoinInput" + // ); + Ok(()) + } + + #[tokio::test] + async fn simple_hash_join() -> Result<()> { + let left = build_large_table("a1", "b1", "c1", 2000000); + let right = build_large_table("a2", "b2", "c2", 5000000); + let on = vec![( + Arc::new(Column::new_with_schema("a1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("b2", &right.schema())?) as _, + )]; + let (left_expr, right_expr): ( + Vec>, + Vec>, + ) = on + .iter() + .map(|(l, r)| (Arc::clone(l), Arc::clone(r))) + .unzip(); + let left_repartitioned: Arc = Arc::new( + RepartitionExec::try_new(left, Partitioning::Hash(left_expr, 32))?, + ); + let right_repartitioned: Arc = Arc::new( + RepartitionExec::try_new(right, Partitioning::Hash(right_expr, 32))?, + ); + let partition_count = left_repartitioned.output_partitioning().partition_count(); + + let join = HashJoinExec::try_new( + left_repartitioned, + right_repartitioned, + on, + None, + &JoinType::Inner, + None, + PartitionMode::Partitioned, + NullEquality::NullEqualsNothing, + )?; + + let task_ctx = Arc::new(TaskContext::default()); + let mut batches = vec![]; + for i in 0..partition_count { + let stream = join.execute(i, Arc::clone(&task_ctx))?; + let more_batches = common::collect(stream).await?; + batches.extend( + more_batches + .into_iter() + .filter(|b| b.num_rows() > 0) + .collect::>(), + ); + } + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + println!("TOTAL ROWS = {}", total_rows); + + // print_batches(&*batches).unwrap(); + Ok(()) + } +} diff --git a/datafusion/physical-plan/src/joins/grace_hash_join/mod.rs b/datafusion/physical-plan/src/joins/grace_hash_join/mod.rs new file mode 100644 index 000000000000..55d7e2035e6c --- /dev/null +++ b/datafusion/physical-plan/src/joins/grace_hash_join/mod.rs @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`GraceHashJoinExec`] Partitioned Hash Join Operator + +pub use exec::GraceHashJoinExec; + +mod exec; +mod stream; diff --git a/datafusion/physical-plan/src/joins/grace_hash_join/stream.rs b/datafusion/physical-plan/src/joins/grace_hash_join/stream.rs new file mode 100644 index 000000000000..d028b0e8bcf8 --- /dev/null +++ b/datafusion/physical-plan/src/joins/grace_hash_join/stream.rs @@ -0,0 +1,409 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Stream implementation for Hash Join +//! +//! This module implements [`HashJoinStream`], the streaming engine for +//! [`super::HashJoinExec`]. See comments in [`HashJoinStream`] for more details. + +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use crate::joins::utils::OnceFut; +use crate::{ + joins::utils::{BuildProbeJoinMetrics, ColumnIndex, JoinFilter}, + ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, SpillManager, +}; + +use crate::empty::EmptyExec; +use crate::joins::grace_hash_join::exec::PartitionIndex; +use crate::joins::{HashJoinExec, PartitionMode}; +use crate::test::TestMemoryExec; +use arrow::datatypes::{Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; +use datafusion_common::{JoinType, NullEquality, Result}; +use datafusion_execution::TaskContext; +use datafusion_physical_expr::PhysicalExprRef; +use futures::{ready, Stream, StreamExt}; +use tokio::sync::Mutex; + +enum GraceJoinState { + /// Waiting for the partitioning phase (Phase 1) to finish + WaitPartitioning, + + WaitAllPartitions { + wait_all_fut: Option>>, + }, + + /// Currently joining partition `current` + JoinPartition { + current: usize, + all_parts: Arc>, + current_stream: Option, + left_fut: Option>>, + right_fut: Option>>, + }, + + Done, +} + +pub struct GraceHashJoinStream { + schema: SchemaRef, + spill_fut: OnceFut, + spill_left: Arc, + spill_right: Arc, + on_left: Vec, + on_right: Vec, + projection: Option>, + filter: Option, + join_type: JoinType, + column_indices: Vec, + join_metrics: Arc, + context: Arc, + accumulator: Arc, + state: GraceJoinState, +} + +#[derive(Debug, Clone)] +pub struct SpillFut { + partition: usize, + left: Vec, + right: Vec, +} +impl SpillFut { + pub(crate) fn new( + partition: usize, + left: Vec, + right: Vec, + ) -> Self { + SpillFut { + partition, + left, + right, + } + } +} + +impl RecordBatchStream for GraceHashJoinStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} + +impl GraceHashJoinStream { + pub fn new( + schema: SchemaRef, + spill_fut: OnceFut, + spill_left: Arc, + spill_right: Arc, + on_left: Vec, + on_right: Vec, + projection: Option>, + filter: Option, + join_type: JoinType, + column_indices: Vec, + join_metrics: Arc, + context: Arc, + accumulator: Arc, + ) -> Self { + Self { + schema, + spill_fut, + spill_left, + spill_right, + on_left, + on_right, + projection, + filter, + join_type, + column_indices, + join_metrics, + context, + accumulator, + state: GraceJoinState::WaitPartitioning, + } + } + + /// Core state machine logic (poll implementation) + fn poll_next_impl( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + loop { + match &mut self.state { + GraceJoinState::WaitPartitioning => { + let shared = ready!(self.spill_fut.get_shared(cx))?; + + let acc = Arc::clone(&self.accumulator); + let left = shared.left.clone(); + let right = shared.right.clone(); + // Use 0 partition as the main + let wait_all_fut = if shared.partition == 0 { + OnceFut::new(async move { + acc.report_partition(shared.partition, left, right).await; + let all = acc.wait_all().await; + Ok(all) + }) + } else { + OnceFut::new(async move { + acc.report_partition(shared.partition, left, right).await; + acc.wait_ready().await; + Ok(vec![]) + }) + }; + self.state = GraceJoinState::WaitAllPartitions { + wait_all_fut: Some(wait_all_fut), + }; + continue; + } + GraceJoinState::WaitAllPartitions { wait_all_fut } => { + if let Some(fut) = wait_all_fut { + let all_arc = ready!(fut.get_shared(cx))?; + let mut all = (*all_arc).clone(); + all.sort_by_key(|s| s.partition); + + self.state = GraceJoinState::JoinPartition { + current: 0, + all_parts: Arc::from(all), + current_stream: None, + left_fut: None, + right_fut: None, + }; + continue; + } else { + return Poll::Pending; + } + } + GraceJoinState::JoinPartition { + current, + all_parts, + current_stream, + left_fut, + right_fut, + } => { + if *current >= all_parts.len() { + self.state = GraceJoinState::Done; + continue; + } + + // If we don't have a stream yet, create one for the current partition pair + if current_stream.is_none() { + if left_fut.is_none() && right_fut.is_none() { + let spill_fut = &all_parts[*current]; + *left_fut = Some(load_partition_async( + Arc::clone(&self.spill_left), + spill_fut.left.clone(), + )); + *right_fut = Some(load_partition_async( + Arc::clone(&self.spill_right), + spill_fut.right.clone(), + )); + } + + let left_batches = + (*ready!(left_fut.as_mut().unwrap().get_shared(cx))?).clone(); + let right_batches = + (*ready!(right_fut.as_mut().unwrap().get_shared(cx))?) + .clone(); + + let stream = build_in_memory_join_stream( + Arc::clone(&self.schema), + left_batches, + right_batches, + &self.on_left, + &self.on_right, + self.projection.clone(), + self.filter.clone(), + self.join_type, + &self.column_indices, + &self.join_metrics, + &self.context, + )?; + + *current_stream = Some(stream); + *left_fut = None; + *right_fut = None; + } + + // Drive current stream forward + if let Some(stream) = current_stream { + match ready!(stream.poll_next_unpin(cx)) { + Some(Ok(batch)) => return Poll::Ready(Some(Ok(batch))), + Some(Err(e)) => return Poll::Ready(Some(Err(e))), + None => { + *current += 1; + *current_stream = None; + continue; + } + } + } + } + GraceJoinState::Done => return Poll::Ready(None), + } + } + } +} + +fn load_partition_async( + spill_manager: Arc, + partitions: Vec, +) -> OnceFut> { + OnceFut::new(async move { + let mut all_batches = Vec::new(); + + for p in partitions { + for chunk in p.chunks { + let mut reader = spill_manager.load_spilled_batch(&chunk)?; + while let Some(batch_result) = reader.next().await { + let batch = batch_result?; + all_batches.push(batch); + } + } + } + Ok(all_batches) + }) +} + +/// Build an in-memory HashJoinExec for one pair of spilled partitions +fn build_in_memory_join_stream( + output_schema: SchemaRef, + left_batches: Vec, + right_batches: Vec, + on_left: &[PhysicalExprRef], + on_right: &[PhysicalExprRef], + projection: Option>, + filter: Option, + join_type: JoinType, + _column_indices: &[ColumnIndex], + _join_metrics: &BuildProbeJoinMetrics, + context: &Arc, +) -> Result { + if left_batches.is_empty() && right_batches.is_empty() { + return EmptyExec::new(output_schema).execute(0, Arc::clone(context)); + } + + let left_schema = left_batches + .first() + .map(|b| b.schema()) + .unwrap_or_else(|| Arc::new(Schema::empty())); + + let right_schema = right_batches + .first() + .map(|b| b.schema()) + .unwrap_or_else(|| Arc::new(Schema::empty())); + + // Build memory execution nodes for each side + let left_plan: Arc = + Arc::new(TestMemoryExec::try_new(&[left_batches], left_schema, None)?); + let right_plan: Arc = Arc::new(TestMemoryExec::try_new( + &[right_batches], + right_schema, + None, + )?); + + // Combine join expressions into pairs + let on: Vec<(PhysicalExprRef, PhysicalExprRef)> = on_left + .iter() + .cloned() + .zip(on_right.iter().cloned()) + .collect(); + + // For one partition pair: always CollectLeft (build left, stream right) + let join_exec = HashJoinExec::try_new( + left_plan, + right_plan, + on, + filter, + &join_type, + projection, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNothing, + )?; + + // Each join executes locally with the same context + join_exec.execute(0, Arc::clone(context)) +} + +impl Stream for GraceHashJoinStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.poll_next_impl(cx) + } +} + +#[derive(Debug)] +pub struct GraceAccumulator { + expected: usize, + collected: Mutex>, + notify: tokio::sync::Notify, +} + +impl GraceAccumulator { + pub fn new(expected: usize) -> Arc { + Arc::new(Self { + expected, + collected: Mutex::new(vec![]), + notify: tokio::sync::Notify::new(), + }) + } + + pub async fn report_partition( + &self, + part_id: usize, + left_idx: Vec, + right_idx: Vec, + ) { + let mut guard = self.collected.lock().await; + if let Some(pos) = guard.iter().position(|s| s.partition == part_id) { + guard[pos] = SpillFut::new(part_id, left_idx, right_idx); + } else { + guard.push(SpillFut::new(part_id, left_idx, right_idx)); + } + + if guard.len() == self.expected { + self.notify.notify_waiters(); + } + } + + pub async fn wait_all(&self) -> Vec { + loop { + { + let guard = self.collected.lock().await; + if guard.len() == self.expected { + return guard.clone(); + } + } + self.notify.notified().await; + } + } + pub async fn wait_ready(&self) { + loop { + { + let guard = self.collected.lock().await; + if guard.len() == self.expected { + return; + } + } + self.notify.notified().await; + } + } +} diff --git a/datafusion/physical-plan/src/joins/hash_join/mod.rs b/datafusion/physical-plan/src/joins/hash_join/mod.rs index 7f1e5cae13a3..612134604c7b 100644 --- a/datafusion/physical-plan/src/joins/hash_join/mod.rs +++ b/datafusion/physical-plan/src/joins/hash_join/mod.rs @@ -20,5 +20,5 @@ pub use exec::HashJoinExec; mod exec; -mod shared_bounds; +pub mod shared_bounds; mod stream; diff --git a/datafusion/physical-plan/src/joins/mod.rs b/datafusion/physical-plan/src/joins/mod.rs index 1d36db996434..8ee4c3de430a 100644 --- a/datafusion/physical-plan/src/joins/mod.rs +++ b/datafusion/physical-plan/src/joins/mod.rs @@ -27,7 +27,9 @@ use parking_lot::Mutex; pub use sort_merge_join::SortMergeJoinExec; pub use symmetric_hash_join::SymmetricHashJoinExec; mod cross_join; +mod grace_hash_join; mod hash_join; + mod nested_loop_join; mod sort_merge_join; mod stream_join_utils; diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index d392650f88dd..1c49454f1a03 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1663,7 +1663,7 @@ pub fn update_hash( hashes_buffer: &mut Vec, deleted_offset: usize, fifo_hashmap: bool, -) -> Result<()> { +) -> Result> { // evaluate the keys let keys_values = on .iter() @@ -1688,7 +1688,7 @@ pub fn update_hash( hash_map.update_from_iter(Box::new(hash_values_iter), deleted_offset); } - Ok(()) + Ok(keys_values) } pub(super) fn equal_rows_arr( diff --git a/datafusion/physical-plan/src/spill/in_memory_spill_buffer.rs b/datafusion/physical-plan/src/spill/in_memory_spill_buffer.rs new file mode 100644 index 000000000000..bba0f6f95625 --- /dev/null +++ b/datafusion/physical-plan/src/spill/in_memory_spill_buffer.rs @@ -0,0 +1,46 @@ +use crate::memory::MemoryStream; +use crate::spill::spill_manager::GetSlicedSize; +use arrow::array::RecordBatch; +use datafusion_common::Result; +use datafusion_execution::SendableRecordBatchStream; +use std::sync::Arc; + +#[derive(Debug)] +pub struct InMemorySpillBuffer { + batches: Vec, + total_bytes: usize, +} + +impl InMemorySpillBuffer { + pub fn from_batch(batch: &RecordBatch) -> Result { + Ok(Self { + batches: vec![batch.clone()], + total_bytes: batch.get_sliced_size()?, + }) + } + + pub fn from_batches(batches: &[RecordBatch]) -> Result { + let mut total_bytes = 0; + let mut owned = Vec::with_capacity(batches.len()); + for b in batches { + total_bytes += b.get_sliced_size()?; + owned.push(b.clone()); + } + Ok(Self { + batches: owned, + total_bytes, + }) + } + + pub fn as_stream( + self: Arc, + schema: Arc, + ) -> Result { + let stream = MemoryStream::try_new(self.batches.clone(), schema, None)?; + Ok(Box::pin(stream)) + } + + pub fn size(&self) -> usize { + self.total_bytes + } +} diff --git a/datafusion/physical-plan/src/spill/mod.rs b/datafusion/physical-plan/src/spill/mod.rs index fab62bff840f..782100e6d4cf 100644 --- a/datafusion/physical-plan/src/spill/mod.rs +++ b/datafusion/physical-plan/src/spill/mod.rs @@ -17,6 +17,7 @@ //! Defines the spilling functions +pub(crate) mod in_memory_spill_buffer; pub(crate) mod in_progress_spill_file; pub(crate) mod spill_manager; @@ -376,16 +377,17 @@ mod tests { use crate::common::collect; use crate::metrics::ExecutionPlanMetricsSet; use crate::metrics::SpillMetrics; - use crate::spill::spill_manager::SpillManager; + use crate::spill::spill_manager::{SpillLocation, SpillManager}; use crate::test::build_table_i32; use arrow::array::{ArrayRef, Float64Array, Int32Array, ListArray, StringArray}; use arrow::compute::cast; use arrow::datatypes::{DataType, Field, Int32Type, Schema}; use arrow::record_batch::RecordBatch; use datafusion_common::Result; - use datafusion_execution::runtime_env::RuntimeEnv; + use datafusion_execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder}; use futures::StreamExt as _; + use datafusion_execution::memory_pool::{FairSpillPool, MemoryPool}; use std::sync::Arc; #[tokio::test] @@ -426,6 +428,71 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_batch_spill_to_memory_and_disk_and_read() -> Result<()> { + let schema: SchemaRef = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + + let batch1 = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from_iter_values(0..1000)), + Arc::new(Int32Array::from_iter_values(1000..2000)), + ], + )?; + + let batch2 = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from_iter_values(2000..4000)), + Arc::new(Int32Array::from_iter_values(4000..6000)), + ], + )?; + + let num_rows = batch1.num_rows() + batch2.num_rows(); + let batches = vec![batch1, batch2]; + + // --- create small memory pool (simulate memory pressure) --- + let memory_limit_bytes = 20 * 1024; // 20KB + let memory_pool: Arc = + Arc::new(FairSpillPool::new(memory_limit_bytes)); + + // Construct SpillManager + let env = RuntimeEnvBuilder::new() + .with_memory_pool(memory_pool) + .build_arc()?; + let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let spill_manager = SpillManager::new(env, metrics, Arc::clone(&schema)); + + let results = spill_manager.spill_batches_auto(&batches, "TestAutoSpill")?; + assert_eq!(results.len(), 2); + + let mem_count = results + .iter() + .filter(|r| matches!(r, SpillLocation::Memory(_))) + .count(); + let disk_count = results + .iter() + .filter(|r| matches!(r, SpillLocation::Disk(_))) + .count(); + assert!(mem_count >= 1); + assert!(disk_count >= 1); + + let spilled_rows = spill_manager.metrics.spilled_rows.value(); + assert_eq!(spilled_rows, num_rows); + + for spill in results { + let stream = spill_manager.load_spilled_batch(&spill)?; + let collected = collect(stream).await?; + assert!(!collected.is_empty()); + assert_eq!(collected[0].schema(), schema); + } + + Ok(()) + } + #[tokio::test] async fn test_batch_spill_and_read_dictionary_arrays() -> Result<()> { // See https://github.com/apache/datafusion/issues/4658 diff --git a/datafusion/physical-plan/src/spill/spill_manager.rs b/datafusion/physical-plan/src/spill/spill_manager.rs index ad23bd66a021..d3c38aba4598 100644 --- a/datafusion/physical-plan/src/spill/spill_manager.rs +++ b/datafusion/physical-plan/src/spill/spill_manager.rs @@ -21,15 +21,17 @@ use arrow::array::StringViewArray; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_execution::runtime_env::RuntimeEnv; +use std::slice; use std::sync::Arc; -use datafusion_common::{config::SpillCompression, Result}; +use datafusion_common::{config::SpillCompression, DataFusionError, Result}; use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::SendableRecordBatchStream; use super::{in_progress_spill_file::InProgressSpillFile, SpillReaderStream}; use crate::coop::cooperative; use crate::{common::spawn_buffered, metrics::SpillMetrics}; +use crate::spill::in_memory_spill_buffer::InMemorySpillBuffer; /// The `SpillManager` is responsible for the following tasks: /// - Reading and writing `RecordBatch`es to raw files based on the provided configurations. @@ -168,6 +170,54 @@ impl SpillManager { Ok(file.map(|f| (f, max_record_batch_size))) } + /// Automatically decides whether to spill the given RecordBatch to memory or disk, + /// depending on available memory pool capacity. + pub(crate) fn spill_batch_auto(&self, batch: &RecordBatch, request_msg: &str) -> Result { + // let Some(file) = self.spill_record_batch_and_finish(slice::from_ref(batch), request_msg)? else { + // return Err(DataFusionError::Execution( + // "failed to spill batch to disk".into(), + // )); + // }; + // Ok(SpillLocation::Disk(Arc::new(file))) + // // + let size = batch.get_sliced_size()?; + + // Check current memory usage and total limit from the runtime memory pool + let used = self.env.memory_pool.reserved(); + let limit = match self.env.memory_pool.memory_limit() { + datafusion_execution::memory_pool::MemoryLimit::Finite(l) => l, + _ => usize::MAX, + }; + + // If there's enough memory (with a safety margin), keep it in memory + if used + size * 3 / 2 <= limit { + let buf = Arc::new(InMemorySpillBuffer::from_batch(batch)?); + self.metrics.spilled_bytes.add(size); + self.metrics.spilled_rows.add(batch.num_rows()); + Ok(SpillLocation::Memory(buf)) + } else { + // Otherwise spill to disk using the existing SpillManager logic + let Some(file) = self.spill_record_batch_and_finish(slice::from_ref(batch), request_msg)? else { + return Err(DataFusionError::Execution( + "failed to spill batch to disk".into(), + )); + }; + Ok(SpillLocation::Disk(Arc::new(file))) + } + } + + pub fn spill_batches_auto( + &self, + batches: &[RecordBatch], + request_msg: &str, + ) -> Result> { + let mut result = Vec::with_capacity(batches.len()); + for batch in batches { + result.push(self.spill_batch_auto(batch, request_msg)?); + } + Ok(result) + } + /// Reads a spill file as a stream. The file must be created by the current `SpillManager`. /// This method will generate output in FIFO order: the batch appended first /// will be read first. @@ -182,8 +232,37 @@ impl SpillManager { Ok(spawn_buffered(stream, self.batch_read_buffer_capacity)) } + + pub fn read_spill_as_stream_ref( + &self, + spill_file_path: &RefCountedTempFile, + ) -> Result { + let stream = Box::pin(cooperative(SpillReaderStream::new( + Arc::clone(&self.schema), + spill_file_path.clone_refcounted()?, + ))); + + Ok(spawn_buffered(stream, self.batch_read_buffer_capacity)) + } + + pub fn load_spilled_batch( + &self, + spill: &SpillLocation, + ) -> Result { + match spill { + SpillLocation::Memory(buf) => Ok(Arc::clone(&buf).as_stream(Arc::clone(&self.schema))?), + SpillLocation::Disk(file) => self.read_spill_as_stream_ref(file), + } + } } +#[derive(Debug, Clone)] +pub enum SpillLocation { + Memory(Arc), + Disk(Arc), +} + + pub(crate) trait GetSlicedSize { /// Returns the size of the `RecordBatch` when sliced. /// Note: if multiple arrays or even a single array share the same data buffers, we may double count each buffer.