Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions datafusion/physical-plan/src/spill/in_memory_spill_buffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use crate::memory::MemoryStream;
use crate::spill::spill_manager::GetSlicedSize;
use arrow::array::RecordBatch;
use datafusion_common::Result;
use datafusion_execution::SendableRecordBatchStream;
use std::sync::Arc;

#[derive(Debug)]
pub struct InMemorySpillBuffer {
batches: Vec<RecordBatch>,
total_bytes: usize,
}

impl InMemorySpillBuffer {
pub fn from_batch(batch: &RecordBatch) -> Result<Self> {
Ok(Self {
batches: vec![batch.clone()],
total_bytes: batch.get_sliced_size()?,
})
}

pub fn from_batches(batches: &[RecordBatch]) -> Result<Self> {
let mut total_bytes = 0;
let mut owned = Vec::with_capacity(batches.len());
for b in batches {
total_bytes += b.get_sliced_size()?;
owned.push(b.clone());
}
Ok(Self {
batches: owned,
total_bytes,
})
}

pub fn as_stream(
self: Arc<Self>,
schema: Arc<arrow_schema::Schema>,
) -> Result<SendableRecordBatchStream> {
let stream = MemoryStream::try_new(self.batches.clone(), schema, None)?;
Ok(Box::pin(stream))
}

pub fn size(&self) -> usize {
self.total_bytes
}
}
65 changes: 63 additions & 2 deletions datafusion/physical-plan/src/spill/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

pub(crate) mod in_progress_spill_file;
pub(crate) mod spill_manager;
pub(crate) mod in_memory_spill_buffer;

use std::fs::File;
use std::io::BufReader;
Expand Down Expand Up @@ -376,17 +377,18 @@ mod tests {
use crate::common::collect;
use crate::metrics::ExecutionPlanMetricsSet;
use crate::metrics::SpillMetrics;
use crate::spill::spill_manager::SpillManager;
use crate::spill::spill_manager::{SpillLocation, SpillManager};
use crate::test::build_table_i32;
use arrow::array::{ArrayRef, Float64Array, Int32Array, ListArray, StringArray};
use arrow::compute::cast;
use arrow::datatypes::{DataType, Field, Int32Type, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
use futures::StreamExt as _;

use std::sync::Arc;
use datafusion_execution::memory_pool::{FairSpillPool, MemoryPool};

#[tokio::test]
async fn test_batch_spill_and_read() -> Result<()> {
Expand Down Expand Up @@ -426,6 +428,65 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_batch_spill_to_memory_and_disk_and_read() -> Result<()> {
let schema: SchemaRef = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]));

let batch1 = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(Int32Array::from_iter_values(0..1000)),
Arc::new(Int32Array::from_iter_values(1000..2000)),
],
)?;

let batch2 = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(Int32Array::from_iter_values(2000..4000)),
Arc::new(Int32Array::from_iter_values(4000..6000)),
],
)?;

let num_rows = batch1.num_rows() + batch2.num_rows();
let batches = vec![batch1, batch2];

// --- create small memory pool (simulate memory pressure) ---
let memory_limit_bytes = 20 * 1024; // 20KB
let memory_pool: Arc<dyn MemoryPool> = Arc::new(FairSpillPool::new(memory_limit_bytes));


// Construct SpillManager
let env = RuntimeEnvBuilder::new()
.with_memory_pool(memory_pool)
.build_arc()?;
let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
let spill_manager = SpillManager::new(env, metrics, Arc::clone(&schema));

let results = spill_manager.spill_batches_auto(&batches, "TestAutoSpill")?;
assert_eq!(results.len(), 2);

let mem_count = results.iter().filter(|r| matches!(r, SpillLocation::Memory(_))).count();
let disk_count = results.iter().filter(|r| matches!(r, SpillLocation::Disk(_))).count();
assert!(mem_count >= 1);
assert!(disk_count >= 1);

let spilled_rows = spill_manager.metrics.spilled_rows.value();
assert_eq!(spilled_rows, num_rows);

for spill in results {
let stream = spill_manager.load_spilled_batch(spill)?;
let collected = collect(stream).await?;
assert!(!collected.is_empty());
assert_eq!(collected[0].schema(), schema);
}

Ok(())
}

#[tokio::test]
async fn test_batch_spill_and_read_dictionary_arrays() -> Result<()> {
// See https://github.com/apache/datafusion/issues/4658
Expand Down
62 changes: 61 additions & 1 deletion datafusion/physical-plan/src/spill/spill_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@ use arrow::array::StringViewArray;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_execution::runtime_env::RuntimeEnv;
use std::slice;
use std::sync::Arc;

use datafusion_common::{config::SpillCompression, Result};
use datafusion_common::{config::SpillCompression, DataFusionError, Result};
use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::SendableRecordBatchStream;

use super::{in_progress_spill_file::InProgressSpillFile, SpillReaderStream};
use crate::coop::cooperative;
use crate::{common::spawn_buffered, metrics::SpillMetrics};
use crate::spill::in_memory_spill_buffer::InMemorySpillBuffer;

/// The `SpillManager` is responsible for the following tasks:
/// - Reading and writing `RecordBatch`es to raw files based on the provided configurations.
Expand Down Expand Up @@ -168,6 +170,47 @@ impl SpillManager {
Ok(file.map(|f| (f, max_record_batch_size)))
}

/// Automatically decides whether to spill the given RecordBatch to memory or disk,
/// depending on available memory pool capacity.
pub(crate) fn spill_batch_auto(&self, batch: &RecordBatch, request_msg: &str) -> Result<SpillLocation> {
let size = batch.get_sliced_size()?;

// Check current memory usage and total limit from the runtime memory pool
let used = self.env.memory_pool.reserved();
let limit = match self.env.memory_pool.memory_limit() {
datafusion_execution::memory_pool::MemoryLimit::Finite(l) => l,
_ => usize::MAX,
};

// If there's enough memory (with a small safety margin), keep it in memory
if used + size * 3 / 2 <= limit {
let buf = Arc::new(InMemorySpillBuffer::from_batch(batch)?);
self.metrics.spilled_bytes.add(size);
self.metrics.spilled_rows.add(batch.num_rows());
Ok(SpillLocation::Memory(buf))
} else {
// Otherwise spill to disk using the existing SpillManager logic
let Some(file) = self.spill_record_batch_and_finish(slice::from_ref(batch), request_msg)? else {
return Err(DataFusionError::Execution(
"failed to spill batch to disk".into(),
));
};
Ok(SpillLocation::Disk(file))
}
}

pub fn spill_batches_auto(
&self,
batches: &[RecordBatch],
request_msg: &str,
) -> Result<Vec<SpillLocation>> {
let mut result = Vec::with_capacity(batches.len());
for batch in batches {
result.push(self.spill_batch_auto(batch, request_msg)?);
}
Ok(result)
}

/// Reads a spill file as a stream. The file must be created by the current `SpillManager`.
/// This method will generate output in FIFO order: the batch appended first
/// will be read first.
Expand All @@ -182,8 +225,25 @@ impl SpillManager {

Ok(spawn_buffered(stream, self.batch_read_buffer_capacity))
}

pub fn load_spilled_batch(
&self,
spill: SpillLocation,
) -> Result<SendableRecordBatchStream> {
match spill {
SpillLocation::Memory(buf) => Ok(buf.as_stream(Arc::clone(&self.schema))?),
SpillLocation::Disk(file) => self.read_spill_as_stream(file),
}
}
}

#[derive(Debug)]
pub enum SpillLocation {
Memory(Arc<InMemorySpillBuffer>),
Disk(RefCountedTempFile),
}


pub(crate) trait GetSlicedSize {
/// Returns the size of the `RecordBatch` when sliced.
/// Note: if multiple arrays or even a single array share the same data buffers, we may double count each buffer.
Expand Down
Loading