Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ target
rusty-tags.vi
.history
.flatbuffers/
.idea/

.vscode
venv/*
22 changes: 11 additions & 11 deletions arrow-pyarrow-integration-testing/tests/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,17 @@ def test_list_array(self):
"""
Python -> Rust -> Python
"""
old_allocated = pyarrow.total_allocated_bytes()
a = pyarrow.array([[], None, [1, 2], [4, 5, 6]], pyarrow.list_(pyarrow.int64()))
b = arrow_pyarrow_integration_testing.round_trip(a)

b.validate(full=True)
assert a.to_pylist() == b.to_pylist()
assert a.type == b.type
del a
del b
# No leak of C++ memory
self.assertEqual(old_allocated, pyarrow.total_allocated_bytes())
for _ in range(2):
old_allocated = pyarrow.total_allocated_bytes()
a = pyarrow.array([[], None, [1, 2], [4, 5, 6]], pyarrow.list_(pyarrow.int64()))
b = arrow_pyarrow_integration_testing.round_trip(a)

b.validate(full=True)
assert a.to_pylist() == b.to_pylist()
assert a.type == b.type
del a
del b
# No leak of C++ memory
self.assertEqual(old_allocated, pyarrow.total_allocated_bytes())


78 changes: 31 additions & 47 deletions arrow/src/array/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use crate::{

use super::ArrayData;
use crate::datatypes::DataType;
use crate::ffi::ArrowArray;

impl TryFrom<ffi::ArrowArray> for ArrayData {
type Error = ArrowError;
Expand Down Expand Up @@ -60,62 +59,47 @@ impl TryFrom<ffi::ArrowArray> for ArrayData {
}
}

fn array_data_to_arrow_array(
value: ArrayData,
nullable: bool,
) -> Result<ffi::ArrowArray> {
let len = value.len();
let offset = value.offset() as usize;
let null_count = value.null_count();
let buffers = value.buffers().to_vec();
let null_buffer = value.null_buffer().cloned();
let child_data = value
.child_data()
.iter()
.map(|arr| array_data_to_arrow_array(arr.clone(), nullable))
.collect::<Result<Vec<_>>>()?;

unsafe {
ffi::ArrowArray::try_new(
value.data_type(),
len,
null_count,
null_buffer,
offset,
buffers,
child_data,
nullable,
)
}
}

impl TryFrom<ArrayData> for ffi::ArrowArray {
type Error = ArrowError;

fn try_from(value: ArrayData) -> Result<Self> {
// If parent is nullable, then children also must be nullable
// so we pass this nullable to the creation of hte child data
// so we pass this nullable to the creation of the child data
let nullable = match value.data_type() {
DataType::List(field) => field.is_nullable(),
DataType::LargeList(field) => field.is_nullable(),
_ => false,
};

let len = value.len();
let offset = value.offset() as usize;
let null_count = value.null_count();
let buffers = value.buffers().to_vec();
let null_buffer = value.null_buffer().cloned();
let child_data = value
.child_data()
.iter()
.map(|arr| {
let len = arr.len();
let offset = arr.offset() as usize;
let null_count = arr.null_count();
let buffers = arr.buffers().to_vec();
let null_buffer = arr.null_buffer().cloned();

// Note: the nullable comes from the parent data.
unsafe {
ArrowArray::try_new(
arr.data_type(),
len,
null_count,
null_buffer,
offset,
buffers,
vec![],
nullable,
)
.expect("infallible")
}
})
.collect::<Vec<_>>();

unsafe {
ffi::ArrowArray::try_new(
value.data_type(),
len,
null_count,
null_buffer,
offset,
buffers,
child_data,
nullable,
)
}
array_data_to_arrow_array(value, nullable)
}
}

Expand Down
54 changes: 35 additions & 19 deletions arrow/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ use std::{
ffi::CStr,
ffi::CString,
iter,
mem::{size_of, ManuallyDrop},
mem::size_of,
os::raw::c_char,
ptr::{self, NonNull},
sync::Arc,
Expand All @@ -93,6 +93,11 @@ use crate::datatypes::{DataType, Field, TimeUnit};
use crate::error::{ArrowError, Result};
use crate::util::bit_util;

const FLAG_NONE: i64 = 0;
const FLAG_DICTIONARY_ORDERED: i64 = 1;
const FLAG_NULLABLE: i64 = 2;
const FLAG_MAP_KEYS_SORTED: i64 = 4;

/// ABI-compatible struct for `ArrowSchema` from C Data Interface
/// See <https://arrow.apache.org/docs/format/CDataInterface.html#structure-definitions>
/// This was created by bindgen
Expand All @@ -112,10 +117,19 @@ pub struct FFI_ArrowSchema {

// callback used to drop [FFI_ArrowSchema] when it is exported.
unsafe extern "C" fn release_schema(schema: *mut FFI_ArrowSchema) {
// take ownership back to release it.
let schema = &mut *schema;

// take ownership back to release it.
// release children
for i in 0..schema.n_children {
let child_ptr = *schema.children.add(i as usize);
let child = &*child_ptr;
child.release.map(|release| release(child_ptr));
}

CString::from_raw(schema.format as *mut std::os::raw::c_char);
CString::from_raw(schema.name as *mut std::os::raw::c_char);


schema.release = None;
}
Expand All @@ -135,13 +149,14 @@ impl FFI_ArrowSchema {
let n_children = children.len() as i64;
let children_ptr = children.as_ptr() as *mut *mut FFI_ArrowSchema;

let flags = if nullable { 2 } else { 0 };
let flags = if nullable { FLAG_NULLABLE } else { FLAG_NONE };

let private_data = Box::new(SchemaPrivateData { children });
// <https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema>
FFI_ArrowSchema {
format: CString::new(format).unwrap().into_raw(),
// For child data a non null string is expected and is called item
// TODO: get the field name for general nested data
// For List child data a non null string is expected and is called item
name: CString::new("item").unwrap().into_raw(),
metadata: std::ptr::null_mut(),
flags,
Expand All @@ -159,7 +174,7 @@ impl FFI_ArrowSchema {
format: std::ptr::null_mut(),
name: std::ptr::null_mut(),
metadata: std::ptr::null_mut(),
flags: 0,
flags: FLAG_NONE,
n_children: 0,
children: ptr::null_mut(),
dictionary: std::ptr::null_mut(),
Expand Down Expand Up @@ -221,31 +236,27 @@ fn to_datatype(
// at that point the child data is not yet known, but it is also not required to determine
// the buffer length of the list arrays.
"+l" => {
let nullable = schema.flags == 2;
let nullable = (schema.flags & FLAG_NULLABLE) != 0;
// Safety
// Should be set as this is expected from the C FFI definition
debug_assert!(!schema.name.is_null());
let name = unsafe { CString::from_raw(schema.name as *mut c_char) }
.into_string()
let name = unsafe { CStr::from_ptr(schema.name as *const c_char) }
.to_str()
.unwrap();
// prevent a double free
let name = ManuallyDrop::new(name);
DataType::List(Box::new(Field::new(
&name,
child_type.unwrap_or(DataType::Null),
nullable,
)))
}
"+L" => {
let nullable = schema.flags == 2;
let nullable = (schema.flags & FLAG_NULLABLE) != 0;
// Safety
// Should be set as this is expected from the C FFI definition
debug_assert!(!schema.name.is_null());
let name = unsafe { CString::from_raw(schema.name as *mut c_char) }
.into_string()
let name = unsafe { CStr::from_ptr(schema.name as *const c_char) }
.to_str()
.unwrap();
// prevent a double free
let name = ManuallyDrop::new(name);
DataType::LargeList(Box::new(Field::new(
&name,
child_type.unwrap_or(DataType::Null),
Expand Down Expand Up @@ -392,6 +403,13 @@ unsafe extern "C" fn release_array(array: *mut FFI_ArrowArray) {
return;
}
let array = &mut *array;

// release children
for i in 0..array.n_children {
let child_ptr = *array.children.add(i as usize);
let child = &*child_ptr;
child.release.map(|release| release(child_ptr));
}
// take ownership of `private_data`, therefore dropping it
Box::from_raw(array.private_data as *mut PrivateData);

Expand Down Expand Up @@ -654,7 +672,7 @@ impl ArrowArray {
debug_assert_eq!(bits % 8, 0);
(self.array.length as usize + 1) * (bits / 8)
}
(DataType::Utf8, 2) | (DataType::Binary, 2) | (DataType::List(_), 2) => {
(DataType::Utf8, 2) | (DataType::Binary, 2) => {
// the len of the data buffer (buffer 2) equals the last value of the offset buffer (buffer 1)
let len = self.buffer_len(1)?;
// first buffer is the null buffer => add(1)
Expand All @@ -666,9 +684,7 @@ impl ArrowArray {
// get last offset
(unsafe { *offset_buffer.add(len / size_of::<i32>() - 1) }) as usize
}
(DataType::LargeUtf8, 2)
| (DataType::LargeBinary, 2)
| (DataType::LargeList(_), 2) => {
(DataType::LargeUtf8, 2) | (DataType::LargeBinary, 2) => {
// the len of the data buffer (buffer 2) equals the last value of the offset buffer (buffer 1)
let len = self.buffer_len(1)?;
// first buffer is the null buffer => add(1)
Expand Down