From e9874ada51f7f876395a7907e16a8af598b0e5b1 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Fri, 23 Apr 2021 12:16:08 +0200 Subject: [PATCH] recurse nested data and update release functions --- .gitignore | 1 + .../tests/test_sql.py | 22 +++--- arrow/src/array/ffi.rs | 78 ++++++++----------- arrow/src/ffi.rs | 54 ++++++++----- 4 files changed, 78 insertions(+), 77 deletions(-) diff --git a/.gitignore b/.gitignore index 5b3bf6c4a66e..b9a0d2294f17 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ target rusty-tags.vi .history .flatbuffers/ +.idea/ .vscode venv/* diff --git a/arrow-pyarrow-integration-testing/tests/test_sql.py b/arrow-pyarrow-integration-testing/tests/test_sql.py index c0de382057c1..ab40dd429230 100644 --- a/arrow-pyarrow-integration-testing/tests/test_sql.py +++ b/arrow-pyarrow-integration-testing/tests/test_sql.py @@ -83,17 +83,17 @@ def test_list_array(self): """ Python -> Rust -> Python """ - old_allocated = pyarrow.total_allocated_bytes() - a = pyarrow.array([[], None, [1, 2], [4, 5, 6]], pyarrow.list_(pyarrow.int64())) - b = arrow_pyarrow_integration_testing.round_trip(a) - - b.validate(full=True) - assert a.to_pylist() == b.to_pylist() - assert a.type == b.type - del a - del b - # No leak of C++ memory - self.assertEqual(old_allocated, pyarrow.total_allocated_bytes()) + for _ in range(2): + old_allocated = pyarrow.total_allocated_bytes() + a = pyarrow.array([[], None, [1, 2], [4, 5, 6]], pyarrow.list_(pyarrow.int64())) + b = arrow_pyarrow_integration_testing.round_trip(a) + b.validate(full=True) + assert a.to_pylist() == b.to_pylist() + assert a.type == b.type + del a + del b + # No leak of C++ memory + self.assertEqual(old_allocated, pyarrow.total_allocated_bytes()) diff --git a/arrow/src/array/ffi.rs b/arrow/src/array/ffi.rs index 450685bf522a..92cbc26bf1a3 100644 --- a/arrow/src/array/ffi.rs +++ b/arrow/src/array/ffi.rs @@ -26,7 +26,6 @@ use crate::{ use super::ArrayData; use crate::datatypes::DataType; -use crate::ffi::ArrowArray; impl TryFrom for ArrayData { type Error = ArrowError; @@ -60,62 +59,47 @@ impl TryFrom for ArrayData { } } +fn array_data_to_arrow_array( + value: ArrayData, + nullable: bool, +) -> Result { + let len = value.len(); + let offset = value.offset() as usize; + let null_count = value.null_count(); + let buffers = value.buffers().to_vec(); + let null_buffer = value.null_buffer().cloned(); + let child_data = value + .child_data() + .iter() + .map(|arr| array_data_to_arrow_array(arr.clone(), nullable)) + .collect::>>()?; + + unsafe { + ffi::ArrowArray::try_new( + value.data_type(), + len, + null_count, + null_buffer, + offset, + buffers, + child_data, + nullable, + ) + } +} + impl TryFrom for ffi::ArrowArray { type Error = ArrowError; fn try_from(value: ArrayData) -> Result { // If parent is nullable, then children also must be nullable - // so we pass this nullable to the creation of hte child data + // so we pass this nullable to the creation of the child data let nullable = match value.data_type() { DataType::List(field) => field.is_nullable(), DataType::LargeList(field) => field.is_nullable(), _ => false, }; - - let len = value.len(); - let offset = value.offset() as usize; - let null_count = value.null_count(); - let buffers = value.buffers().to_vec(); - let null_buffer = value.null_buffer().cloned(); - let child_data = value - .child_data() - .iter() - .map(|arr| { - let len = arr.len(); - let offset = arr.offset() as usize; - let null_count = arr.null_count(); - let buffers = arr.buffers().to_vec(); - let null_buffer = arr.null_buffer().cloned(); - - // Note: the nullable comes from the parent data. - unsafe { - ArrowArray::try_new( - arr.data_type(), - len, - null_count, - null_buffer, - offset, - buffers, - vec![], - nullable, - ) - .expect("infallible") - } - }) - .collect::>(); - - unsafe { - ffi::ArrowArray::try_new( - value.data_type(), - len, - null_count, - null_buffer, - offset, - buffers, - child_data, - nullable, - ) - } + array_data_to_arrow_array(value, nullable) } } diff --git a/arrow/src/ffi.rs b/arrow/src/ffi.rs index 3a6d031ebd87..eef734089dd7 100644 --- a/arrow/src/ffi.rs +++ b/arrow/src/ffi.rs @@ -81,7 +81,7 @@ use std::{ ffi::CStr, ffi::CString, iter, - mem::{size_of, ManuallyDrop}, + mem::size_of, os::raw::c_char, ptr::{self, NonNull}, sync::Arc, @@ -93,6 +93,11 @@ use crate::datatypes::{DataType, Field, TimeUnit}; use crate::error::{ArrowError, Result}; use crate::util::bit_util; +const FLAG_NONE: i64 = 0; +const FLAG_DICTIONARY_ORDERED: i64 = 1; +const FLAG_NULLABLE: i64 = 2; +const FLAG_MAP_KEYS_SORTED: i64 = 4; + /// ABI-compatible struct for `ArrowSchema` from C Data Interface /// See /// This was created by bindgen @@ -112,10 +117,19 @@ pub struct FFI_ArrowSchema { // callback used to drop [FFI_ArrowSchema] when it is exported. unsafe extern "C" fn release_schema(schema: *mut FFI_ArrowSchema) { + // take ownership back to release it. let schema = &mut *schema; - // take ownership back to release it. + // release children + for i in 0..schema.n_children { + let child_ptr = *schema.children.add(i as usize); + let child = &*child_ptr; + child.release.map(|release| release(child_ptr)); + } + CString::from_raw(schema.format as *mut std::os::raw::c_char); + CString::from_raw(schema.name as *mut std::os::raw::c_char); + schema.release = None; } @@ -135,13 +149,14 @@ impl FFI_ArrowSchema { let n_children = children.len() as i64; let children_ptr = children.as_ptr() as *mut *mut FFI_ArrowSchema; - let flags = if nullable { 2 } else { 0 }; + let flags = if nullable { FLAG_NULLABLE } else { FLAG_NONE }; let private_data = Box::new(SchemaPrivateData { children }); // FFI_ArrowSchema { format: CString::new(format).unwrap().into_raw(), - // For child data a non null string is expected and is called item + // TODO: get the field name for general nested data + // For List child data a non null string is expected and is called item name: CString::new("item").unwrap().into_raw(), metadata: std::ptr::null_mut(), flags, @@ -159,7 +174,7 @@ impl FFI_ArrowSchema { format: std::ptr::null_mut(), name: std::ptr::null_mut(), metadata: std::ptr::null_mut(), - flags: 0, + flags: FLAG_NONE, n_children: 0, children: ptr::null_mut(), dictionary: std::ptr::null_mut(), @@ -221,15 +236,13 @@ fn to_datatype( // at that point the child data is not yet known, but it is also not required to determine // the buffer length of the list arrays. "+l" => { - let nullable = schema.flags == 2; + let nullable = (schema.flags & FLAG_NULLABLE) != 0; // Safety // Should be set as this is expected from the C FFI definition debug_assert!(!schema.name.is_null()); - let name = unsafe { CString::from_raw(schema.name as *mut c_char) } - .into_string() + let name = unsafe { CStr::from_ptr(schema.name as *const c_char) } + .to_str() .unwrap(); - // prevent a double free - let name = ManuallyDrop::new(name); DataType::List(Box::new(Field::new( &name, child_type.unwrap_or(DataType::Null), @@ -237,15 +250,13 @@ fn to_datatype( ))) } "+L" => { - let nullable = schema.flags == 2; + let nullable = (schema.flags & FLAG_NULLABLE) != 0; // Safety // Should be set as this is expected from the C FFI definition debug_assert!(!schema.name.is_null()); - let name = unsafe { CString::from_raw(schema.name as *mut c_char) } - .into_string() + let name = unsafe { CStr::from_ptr(schema.name as *const c_char) } + .to_str() .unwrap(); - // prevent a double free - let name = ManuallyDrop::new(name); DataType::LargeList(Box::new(Field::new( &name, child_type.unwrap_or(DataType::Null), @@ -392,6 +403,13 @@ unsafe extern "C" fn release_array(array: *mut FFI_ArrowArray) { return; } let array = &mut *array; + + // release children + for i in 0..array.n_children { + let child_ptr = *array.children.add(i as usize); + let child = &*child_ptr; + child.release.map(|release| release(child_ptr)); + } // take ownership of `private_data`, therefore dropping it Box::from_raw(array.private_data as *mut PrivateData); @@ -654,7 +672,7 @@ impl ArrowArray { debug_assert_eq!(bits % 8, 0); (self.array.length as usize + 1) * (bits / 8) } - (DataType::Utf8, 2) | (DataType::Binary, 2) | (DataType::List(_), 2) => { + (DataType::Utf8, 2) | (DataType::Binary, 2) => { // the len of the data buffer (buffer 2) equals the last value of the offset buffer (buffer 1) let len = self.buffer_len(1)?; // first buffer is the null buffer => add(1) @@ -666,9 +684,7 @@ impl ArrowArray { // get last offset (unsafe { *offset_buffer.add(len / size_of::() - 1) }) as usize } - (DataType::LargeUtf8, 2) - | (DataType::LargeBinary, 2) - | (DataType::LargeList(_), 2) => { + (DataType::LargeUtf8, 2) | (DataType::LargeBinary, 2) => { // the len of the data buffer (buffer 2) equals the last value of the offset buffer (buffer 1) let len = self.buffer_len(1)?; // first buffer is the null buffer => add(1)