Skip to content

Commit d63cac7

Browse files
committed
Use new API for appending values to avoid slices
1 parent 8302949 commit d63cac7

File tree

3 files changed

+105
-45
lines changed

3 files changed

+105
-45
lines changed

arrow-select/src/coalesce.rs

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,15 @@ impl BatchCoalescer {
226226
return Ok(());
227227
}
228228

229+
// setup input rows
230+
assert_eq!(arrays.len(), self.in_progress_arrays.len());
231+
self.in_progress_arrays
232+
.iter_mut()
233+
.zip(arrays)
234+
.for_each(|(in_progress, array)| {
235+
in_progress.set_source(Some(array));
236+
});
237+
229238
// If pushing this batch would exceed the target batch size,
230239
// finish the current batch and start a new one
231240
let mut offset = 0;
@@ -234,10 +243,8 @@ impl BatchCoalescer {
234243
debug_assert!(remaining_rows > 0);
235244

236245
// Copy remaining_rows from each array
237-
for (idx, array) in arrays.iter().enumerate() {
238-
let array = array.slice(offset, remaining_rows);
239-
// TODO move slicing into InProgressArray
240-
self.in_progress_arrays[idx].push_array(array);
246+
for in_progress in self.in_progress_arrays.iter_mut() {
247+
in_progress.copy_rows(offset, remaining_rows)?;
241248
}
242249

243250
self.buffered_rows += remaining_rows;
@@ -250,18 +257,21 @@ impl BatchCoalescer {
250257
// Add any the remaining rows to the buffer
251258
self.buffered_rows += num_rows;
252259
if num_rows > 0 {
253-
for (idx, array) in arrays.into_iter().enumerate() {
254-
// Push the array to the in-progress array
255-
let array = array.slice(offset, num_rows);
256-
// TODO move slicing into InProgressArray
257-
self.in_progress_arrays[idx].push_array(array);
260+
for in_progress in self.in_progress_arrays.iter_mut() {
261+
in_progress.copy_rows(offset, num_rows)?;
258262
}
259263
}
260264

261265
// If we have reached the target batch size, finalize the buffered batch
262266
if self.buffered_rows >= self.batch_size {
263267
self.finish_buffered_batch()?;
264268
}
269+
270+
// clear in progress sources (to allow the memory to be freed)
271+
for in_progress in self.in_progress_arrays.iter_mut() {
272+
in_progress.set_source(None);
273+
}
274+
265275
Ok(())
266276
}
267277

@@ -335,10 +345,17 @@ fn create_in_progress_array(data_type: &DataType, batch_size: usize) -> Box<dyn
335345
/// [`StringViewArray`]: arrow_array::StringViewArray
336346
/// [`UInt32Array`]: arrow_array::UInt32Array
337347
trait InProgressArray: std::fmt::Debug + Send + Sync {
338-
/// Push a new array to the in-progress array
339-
fn push_array(&mut self, array: ArrayRef);
348+
/// Set the source array.
349+
///
350+
/// Subsequent calls to "copy" copy rows from this array into the in-progress array
351+
fn set_source(&mut self, source: Option<ArrayRef>);
352+
353+
/// copy rows from the source array into the in-progress array
354+
///
355+
/// Return an error if the source is not set
356+
fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError>;
340357

341-
/// Finish the currently in-progress array and clear state for the next
358+
/// Finish the currently in-progress array and clear state
342359
fn finish(&mut self) -> Result<ArrayRef, ArrowError>;
343360
}
344361

@@ -552,7 +569,6 @@ mod tests {
552569
let array = col_as_string_view("c0", &batch);
553570
let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
554571
assert_eq!(array.data_buffers().len(), 5);
555-
assert_eq!(gc_array.data_buffers().len(), 1); // compacted into a single buffer
556572

557573
expect_buffer_layout(
558574
gc_array,
@@ -624,7 +640,7 @@ mod tests {
624640

625641
#[test]
626642
fn test_string_view_many_small_compact() {
627-
// The strings are 37 bytes long, so each batch has 200 * 28 = 5600 bytes
643+
// The strings are 28 long, so each batch has 400 * 28 = 5600 bytes
628644
let batch = stringview_batch_repeated(
629645
400,
630646
[Some("This string is 28 bytes long"), Some("small string")],

arrow-select/src/coalesce/byte_view.rs

Lines changed: 61 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ use std::sync::Arc;
2727

2828
/// InProgressArray for StringViewArray and BinaryViewArray
2929
pub(crate) struct InProgressByteViewArray<B: ByteViewType> {
30+
/// The source array
31+
source: Option<Source>,
3032
/// the target batch size (and thus size for views allocation)
3133
batch_size: usize,
3234
/// The in progress vies
@@ -44,6 +46,15 @@ pub(crate) struct InProgressByteViewArray<B: ByteViewType> {
4446
_phantom: PhantomData<B>,
4547
}
4648

49+
struct Source {
50+
/// The array to copy form
51+
array: ArrayRef,
52+
/// Should the strings from the source array be copied into new buffers?
53+
need_gc: bool,
54+
/// How many bytes were actually used in the source array's buffers?
55+
ideal_buffer_size: usize,
56+
}
57+
4758
// manually implement Debug because ByteViewType doesn't implement Debug
4859
impl<B: ByteViewType> std::fmt::Debug for InProgressByteViewArray<B> {
4960
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -63,6 +74,7 @@ impl<B: ByteViewType> InProgressByteViewArray<B> {
6374

6475
Self {
6576
batch_size,
77+
source: None,
6678
views: Vec::new(), // allocate in push
6779
nulls: NullBufferBuilder::new(batch_size), // no allocation
6880
current: None,
@@ -80,15 +92,6 @@ impl<B: ByteViewType> InProgressByteViewArray<B> {
8092
self.views.reserve(self.batch_size);
8193
}
8294

83-
/// Update self.nulls with the nulls from the StringViewArray
84-
fn push_nulls(&mut self, s: &GenericByteViewArray<B>) {
85-
if let Some(nulls) = s.nulls().as_ref() {
86-
self.nulls.append_buffer(nulls);
87-
} else {
88-
self.nulls.append_n_non_nulls(s.len());
89-
}
90-
}
91-
9295
/// Finishes in progress block, if any
9396
fn finish_current(&mut self) {
9497
let Some(next_buffer) = self.current.take() else {
@@ -263,38 +266,67 @@ impl<B: ByteViewType> InProgressByteViewArray<B> {
263266
}
264267

265268
impl<B: ByteViewType> InProgressArray for InProgressByteViewArray<B> {
266-
fn push_array(&mut self, array: ArrayRef) {
267-
// If creating StringViewArray output, ensure input was valid utf8 too
269+
fn set_source(&mut self, source: Option<ArrayRef>) {
270+
self.source = source.map(|array| {
271+
let s = array.as_byte_view::<B>();
272+
273+
let (need_gc, ideal_buffer_size) = if s.data_buffers().is_empty() {
274+
(false, 0)
275+
} else {
276+
let ideal_buffer_size = s.total_buffer_bytes_used();
277+
let actual_buffer_size = s.get_buffer_memory_size();
278+
// copying strings is expensive, so only do it if the array is
279+
// sparse (uses at least 2x the memory it needs)
280+
let need_gc =
281+
ideal_buffer_size != 0 && actual_buffer_size > (ideal_buffer_size * 2);
282+
(need_gc, ideal_buffer_size)
283+
};
284+
285+
Source {
286+
array,
287+
need_gc,
288+
ideal_buffer_size,
289+
}
290+
})
291+
}
292+
293+
fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError> {
268294
self.ensure_capacity();
269-
let s = array.as_byte_view::<B>();
295+
let source = self.source.take().ok_or_else(|| {
296+
ArrowError::InvalidArgumentError("InProgressByteViewArray: source not set".to_string())
297+
})?;
270298

271-
// add any nulls, as necessary
272-
self.push_nulls(s);
299+
// If creating StringViewArray output, ensure input was valid utf8 too
300+
let s = source.array.as_byte_view::<B>();
273301

274-
// If there are no data buffers in s (all inlined views), can append the
275-
// views/nulls and done
276-
if s.data_buffers().is_empty() {
277-
self.views.extend_from_slice(s.views().as_ref());
278-
return;
279-
}
302+
// add any nulls, as necessary
303+
if let Some(nulls) = s.nulls().as_ref() {
304+
let nulls = nulls.slice(offset, len);
305+
self.nulls.append_buffer(&nulls);
306+
} else {
307+
self.nulls.append_n_non_nulls(len);
308+
};
280309

281-
let ideal_buffer_size = s.total_buffer_bytes_used();
282-
let actual_buffer_size = s.get_buffer_memory_size();
283310
let buffers = s.data_buffers();
311+
let views = &s.views().as_ref()[offset..offset + len];
284312

285-
// None of the views references the buffers (e.g. sliced)
286-
if ideal_buffer_size == 0 {
287-
self.views.extend_from_slice(s.views().as_ref());
288-
return;
313+
// If there are no data buffers in s (all inlined views), can append the
314+
// views/nulls and done
315+
if source.ideal_buffer_size == 0 {
316+
self.views.extend_from_slice(views);
317+
self.source = Some(source);
318+
return Ok(());
289319
}
290320

291321
// Copying the strings into a buffer can be time-consuming so
292322
// only do it if the array is sparse
293-
if actual_buffer_size > (ideal_buffer_size * 2) {
294-
self.append_views_and_copy_strings(s.views(), ideal_buffer_size, buffers);
323+
if source.need_gc {
324+
self.append_views_and_copy_strings(views, source.ideal_buffer_size, buffers);
295325
} else {
296-
self.append_views_and_update_buffer_index(s.views(), buffers);
326+
self.append_views_and_update_buffer_index(views, buffers);
297327
}
328+
self.source = Some(source);
329+
Ok(())
298330
}
299331

300332
fn finish(&mut self) -> Result<ArrayRef, ArrowError> {

arrow-select/src/coalesce/generic.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,21 +27,33 @@ use arrow_schema::ArrowError;
2727
/// [`concat`]: crate::concat::concat
2828
#[derive(Debug)]
2929
pub(crate) struct GenericInProgressArray {
30-
/// The buffered arrays
30+
/// The current source
31+
source: Option<ArrayRef>,
32+
/// The buffered array slices
3133
buffered_arrays: Vec<ArrayRef>,
3234
}
3335

3436
impl GenericInProgressArray {
3537
/// Create a new `GenericInProgressArray`
3638
pub(crate) fn new() -> Self {
3739
Self {
40+
source: None,
3841
buffered_arrays: vec![],
3942
}
4043
}
4144
}
4245
impl InProgressArray for GenericInProgressArray {
43-
fn push_array(&mut self, array: ArrayRef) {
46+
fn set_source(&mut self, source: Option<ArrayRef>) {
47+
self.source = source
48+
}
49+
50+
fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError> {
51+
let source = self.source.as_ref().ok_or_else(|| {
52+
ArrowError::InvalidArgumentError("GenericInProgressArray: source not set".to_string())
53+
})?;
54+
let array = source.slice(offset, len);
4455
self.buffered_arrays.push(array);
56+
Ok(())
4557
}
4658

4759
fn finish(&mut self) -> Result<ArrayRef, ArrowError> {

0 commit comments

Comments
 (0)