diff --git a/datafusion/physical-plan/src/spill/in_memory_spill_buffer.rs b/datafusion/physical-plan/src/spill/in_memory_spill_buffer.rs new file mode 100644 index 000000000000..bba0f6f95625 --- /dev/null +++ b/datafusion/physical-plan/src/spill/in_memory_spill_buffer.rs @@ -0,0 +1,46 @@ +use crate::memory::MemoryStream; +use crate::spill::spill_manager::GetSlicedSize; +use arrow::array::RecordBatch; +use datafusion_common::Result; +use datafusion_execution::SendableRecordBatchStream; +use std::sync::Arc; + +#[derive(Debug)] +pub struct InMemorySpillBuffer { + batches: Vec, + total_bytes: usize, +} + +impl InMemorySpillBuffer { + pub fn from_batch(batch: &RecordBatch) -> Result { + Ok(Self { + batches: vec![batch.clone()], + total_bytes: batch.get_sliced_size()?, + }) + } + + pub fn from_batches(batches: &[RecordBatch]) -> Result { + let mut total_bytes = 0; + let mut owned = Vec::with_capacity(batches.len()); + for b in batches { + total_bytes += b.get_sliced_size()?; + owned.push(b.clone()); + } + Ok(Self { + batches: owned, + total_bytes, + }) + } + + pub fn as_stream( + self: Arc, + schema: Arc, + ) -> Result { + let stream = MemoryStream::try_new(self.batches.clone(), schema, None)?; + Ok(Box::pin(stream)) + } + + pub fn size(&self) -> usize { + self.total_bytes + } +} diff --git a/datafusion/physical-plan/src/spill/mod.rs b/datafusion/physical-plan/src/spill/mod.rs index fab62bff840f..270b3654b2ba 100644 --- a/datafusion/physical-plan/src/spill/mod.rs +++ b/datafusion/physical-plan/src/spill/mod.rs @@ -19,6 +19,7 @@ pub(crate) mod in_progress_spill_file; pub(crate) mod spill_manager; +pub(crate) mod in_memory_spill_buffer; use std::fs::File; use std::io::BufReader; @@ -376,17 +377,18 @@ mod tests { use crate::common::collect; use crate::metrics::ExecutionPlanMetricsSet; use crate::metrics::SpillMetrics; - use crate::spill::spill_manager::SpillManager; + use crate::spill::spill_manager::{SpillLocation, SpillManager}; use crate::test::build_table_i32; use arrow::array::{ArrayRef, Float64Array, Int32Array, ListArray, StringArray}; use arrow::compute::cast; use arrow::datatypes::{DataType, Field, Int32Type, Schema}; use arrow::record_batch::RecordBatch; use datafusion_common::Result; - use datafusion_execution::runtime_env::RuntimeEnv; + use datafusion_execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder}; use futures::StreamExt as _; use std::sync::Arc; + use datafusion_execution::memory_pool::{FairSpillPool, MemoryPool}; #[tokio::test] async fn test_batch_spill_and_read() -> Result<()> { @@ -426,6 +428,65 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_batch_spill_to_memory_and_disk_and_read() -> Result<()> { + let schema: SchemaRef = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + + let batch1 = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from_iter_values(0..1000)), + Arc::new(Int32Array::from_iter_values(1000..2000)), + ], + )?; + + let batch2 = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from_iter_values(2000..4000)), + Arc::new(Int32Array::from_iter_values(4000..6000)), + ], + )?; + + let num_rows = batch1.num_rows() + batch2.num_rows(); + let batches = vec![batch1, batch2]; + + // --- create small memory pool (simulate memory pressure) --- + let memory_limit_bytes = 20 * 1024; // 20KB + let memory_pool: Arc = Arc::new(FairSpillPool::new(memory_limit_bytes)); + + + // Construct SpillManager + let env = RuntimeEnvBuilder::new() + .with_memory_pool(memory_pool) + .build_arc()?; + let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let spill_manager = SpillManager::new(env, metrics, Arc::clone(&schema)); + + let results = spill_manager.spill_batches_auto(&batches, "TestAutoSpill")?; + assert_eq!(results.len(), 2); + + let mem_count = results.iter().filter(|r| matches!(r, SpillLocation::Memory(_))).count(); + let disk_count = results.iter().filter(|r| matches!(r, SpillLocation::Disk(_))).count(); + assert!(mem_count >= 1); + assert!(disk_count >= 1); + + let spilled_rows = spill_manager.metrics.spilled_rows.value(); + assert_eq!(spilled_rows, num_rows); + + for spill in results { + let stream = spill_manager.load_spilled_batch(spill)?; + let collected = collect(stream).await?; + assert!(!collected.is_empty()); + assert_eq!(collected[0].schema(), schema); + } + + Ok(()) + } + #[tokio::test] async fn test_batch_spill_and_read_dictionary_arrays() -> Result<()> { // See https://github.com/apache/datafusion/issues/4658 diff --git a/datafusion/physical-plan/src/spill/spill_manager.rs b/datafusion/physical-plan/src/spill/spill_manager.rs index ad23bd66a021..e8f5037f764b 100644 --- a/datafusion/physical-plan/src/spill/spill_manager.rs +++ b/datafusion/physical-plan/src/spill/spill_manager.rs @@ -21,15 +21,17 @@ use arrow::array::StringViewArray; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_execution::runtime_env::RuntimeEnv; +use std::slice; use std::sync::Arc; -use datafusion_common::{config::SpillCompression, Result}; +use datafusion_common::{config::SpillCompression, DataFusionError, Result}; use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::SendableRecordBatchStream; use super::{in_progress_spill_file::InProgressSpillFile, SpillReaderStream}; use crate::coop::cooperative; use crate::{common::spawn_buffered, metrics::SpillMetrics}; +use crate::spill::in_memory_spill_buffer::InMemorySpillBuffer; /// The `SpillManager` is responsible for the following tasks: /// - Reading and writing `RecordBatch`es to raw files based on the provided configurations. @@ -168,6 +170,47 @@ impl SpillManager { Ok(file.map(|f| (f, max_record_batch_size))) } + /// Automatically decides whether to spill the given RecordBatch to memory or disk, + /// depending on available memory pool capacity. + pub(crate) fn spill_batch_auto(&self, batch: &RecordBatch, request_msg: &str) -> Result { + let size = batch.get_sliced_size()?; + + // Check current memory usage and total limit from the runtime memory pool + let used = self.env.memory_pool.reserved(); + let limit = match self.env.memory_pool.memory_limit() { + datafusion_execution::memory_pool::MemoryLimit::Finite(l) => l, + _ => usize::MAX, + }; + + // If there's enough memory (with a small safety margin), keep it in memory + if used + size * 3 / 2 <= limit { + let buf = Arc::new(InMemorySpillBuffer::from_batch(batch)?); + self.metrics.spilled_bytes.add(size); + self.metrics.spilled_rows.add(batch.num_rows()); + Ok(SpillLocation::Memory(buf)) + } else { + // Otherwise spill to disk using the existing SpillManager logic + let Some(file) = self.spill_record_batch_and_finish(slice::from_ref(batch), request_msg)? else { + return Err(DataFusionError::Execution( + "failed to spill batch to disk".into(), + )); + }; + Ok(SpillLocation::Disk(file)) + } + } + + pub fn spill_batches_auto( + &self, + batches: &[RecordBatch], + request_msg: &str, + ) -> Result> { + let mut result = Vec::with_capacity(batches.len()); + for batch in batches { + result.push(self.spill_batch_auto(batch, request_msg)?); + } + Ok(result) + } + /// Reads a spill file as a stream. The file must be created by the current `SpillManager`. /// This method will generate output in FIFO order: the batch appended first /// will be read first. @@ -182,8 +225,25 @@ impl SpillManager { Ok(spawn_buffered(stream, self.batch_read_buffer_capacity)) } + + pub fn load_spilled_batch( + &self, + spill: SpillLocation, + ) -> Result { + match spill { + SpillLocation::Memory(buf) => Ok(buf.as_stream(Arc::clone(&self.schema))?), + SpillLocation::Disk(file) => self.read_spill_as_stream(file), + } + } +} + +#[derive(Debug)] +pub enum SpillLocation { + Memory(Arc), + Disk(RefCountedTempFile), } + pub(crate) trait GetSlicedSize { /// Returns the size of the `RecordBatch` when sliced. /// Note: if multiple arrays or even a single array share the same data buffers, we may double count each buffer.