Skip to content

Commit 09232d2

Browse files
osipovartemeadgbear
authored andcommitted
Fix IPC arrow format issue (#280)
* Fix IPC arrow format issue * Fix IPC arrow format issue * Fix comments
1 parent 2ddcf09 commit 09232d2

File tree

5 files changed

+178
-80
lines changed

5 files changed

+178
-80
lines changed

crates/control_plane/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ datafusion-functions-json = { workspace = true }
2222
datafusion-physical-plan = { workspace = true }
2323
datafusion_iceberg = { workspace = true }
2424

25-
flatbuffers = { version = "24.3.25" }
2625
futures = { workspace = true }
2726
iceberg-rest-catalog = { workspace = true }
2827
iceberg-rust = { workspace = true }

crates/control_plane/src/models/mod.rs

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// under the License.
1717

1818
use arrow::array::RecordBatch;
19-
use arrow::datatypes::{DataType, Field};
19+
use arrow::datatypes::{DataType, Field, TimeUnit};
2020
use chrono::{NaiveDateTime, Utc};
2121
use iceberg_rust::object_store::ObjectStoreBuilder;
2222
use object_store::aws::AmazonS3Builder;
@@ -499,17 +499,25 @@ impl ColumnInfo {
499499
DataType::Date32 | DataType::Date64 => {
500500
column_info.r#type = "date".to_string();
501501
}
502-
DataType::Timestamp(_, _) => {
502+
DataType::Timestamp(unit, _) => {
503503
column_info.r#type = "timestamp_ntz".to_string();
504504
column_info.precision = Some(0);
505-
column_info.scale = Some(9);
505+
let scale = match unit {
506+
TimeUnit::Second => 0,
507+
TimeUnit::Millisecond => 3,
508+
TimeUnit::Microsecond => 6,
509+
TimeUnit::Nanosecond => 9,
510+
};
511+
column_info.scale = Some(scale);
506512
}
507513
DataType::Binary => {
508514
column_info.r#type = "binary".to_string();
509515
column_info.byte_length = Some(8_388_608);
510516
column_info.length = Some(8_388_608);
511517
}
512-
_ => {}
518+
_ => {
519+
column_info.r#type = "text".to_string();
520+
}
513521
}
514522
column_info
515523
}
@@ -722,23 +730,35 @@ mod tests {
722730
assert_eq!(column_info.name, "test_field");
723731
assert_eq!(column_info.r#type, "date");
724732

725-
let field = Field::new(
726-
"test_field",
727-
DataType::Timestamp(TimeUnit::Second, None),
728-
false,
729-
);
730-
let column_info = ColumnInfo::from_field(&field);
731-
assert_eq!(column_info.name, "test_field");
732-
assert_eq!(column_info.r#type, "timestamp_ntz");
733-
assert_eq!(column_info.precision.unwrap(), 0);
734-
assert_eq!(column_info.scale.unwrap(), 9);
733+
let units = [
734+
(TimeUnit::Second, 0),
735+
(TimeUnit::Millisecond, 3),
736+
(TimeUnit::Microsecond, 6),
737+
(TimeUnit::Nanosecond, 9),
738+
];
739+
for (unit, scale) in units {
740+
let field = Field::new("test_field", DataType::Timestamp(unit, None), false);
741+
let column_info = ColumnInfo::from_field(&field);
742+
assert_eq!(column_info.name, "test_field");
743+
assert_eq!(column_info.r#type, "timestamp_ntz");
744+
assert_eq!(column_info.precision.unwrap(), 0);
745+
assert_eq!(column_info.scale.unwrap(), scale);
746+
}
735747

736748
let field = Field::new("test_field", DataType::Binary, false);
737749
let column_info = ColumnInfo::from_field(&field);
738750
assert_eq!(column_info.name, "test_field");
739751
assert_eq!(column_info.r#type, "binary");
740752
assert_eq!(column_info.byte_length.unwrap(), 8_388_608);
741753
assert_eq!(column_info.length.unwrap(), 8_388_608);
754+
755+
// Any other type
756+
let field = Field::new("test_field", DataType::Utf8View, false);
757+
let column_info = ColumnInfo::from_field(&field);
758+
assert_eq!(column_info.name, "test_field");
759+
assert_eq!(column_info.r#type, "text");
760+
assert_eq!(column_info.byte_length, None);
761+
assert_eq!(column_info.length, None);
742762
}
743763

744764
#[tokio::test]

crates/control_plane/src/service.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::error::{self, ControlPlaneError, ControlPlaneResult};
1919
use crate::models::{ColumnInfo, Credentials, StorageProfile, StorageProfileCreateRequest};
2020
use crate::models::{Warehouse, WarehouseCreateRequest};
2121
use crate::repository::{StorageProfileRepository, WarehouseRepository};
22-
use crate::utils::convert_record_batches;
22+
use crate::utils::{convert_record_batches, Config};
2323
use arrow::record_batch::RecordBatch;
2424
use arrow_json::writer::JsonArray;
2525
use arrow_json::WriterBuilder;
@@ -72,11 +72,13 @@ pub trait ControlService: Send + Sync {
7272
async fn create_session(&self, session_id: String) -> ControlPlaneResult<()>;
7373

7474
async fn delete_session(&self, session_id: String) -> ControlPlaneResult<()>;
75+
fn config(&self) -> &Config;
7576
}
7677

7778
pub struct ControlServiceImpl {
7879
metastore: Arc<dyn Metastore>,
7980
df_sessions: Arc<RwLock<HashMap<String, SqlExecutor>>>,
81+
config: Config,
8082
}
8183

8284
impl ControlServiceImpl {
@@ -87,6 +89,7 @@ impl ControlServiceImpl {
8789
Self {
8890
metastore,
8991
df_sessions,
92+
config: Config::default(),
9093
}
9194
}
9295
}
@@ -244,8 +247,11 @@ impl ControlService for ControlServiceImpl {
244247
.context(crate::error::ExecutionSnafu)?
245248
.into_iter()
246249
.collect::<Vec<_>>();
250+
251+
let serialization_format = self.config().dbt_serialization_format;
247252
// Add columns dbt metadata to each field
248-
convert_record_batches(records).context(crate::error::DataFusionQuerySnafu { query })
253+
convert_record_batches(records, serialization_format)
254+
.context(error::DataFusionQuerySnafu { query })
249255
}
250256

251257
#[tracing::instrument(level = "debug", skip(self))]
@@ -453,12 +459,15 @@ impl ControlService for ControlServiceImpl {
453459

454460
Ok(())
455461
}
462+
463+
fn config(&self) -> &Config {
464+
&self.config
465+
}
456466
}
457467

458468
#[cfg(test)]
459469
#[allow(clippy::unwrap_used, clippy::expect_used)]
460470
mod tests {
461-
462471
use super::*;
463472
use crate::error::ControlPlaneError;
464473
use crate::models::{

crates/control_plane/src/utils.rs

Lines changed: 127 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
use crate::models::ColumnInfo;
1919
use arrow::array::{
20-
Array, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray,
20+
Array, Int64Array, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray,
2121
TimestampNanosecondArray, TimestampSecondArray, UnionArray,
2222
};
2323
use arrow::datatypes::{Field, Schema, TimeUnit};
@@ -26,7 +26,45 @@ use chrono::DateTime;
2626
use datafusion::arrow::array::ArrayRef;
2727
use datafusion::arrow::datatypes::DataType;
2828
use datafusion::common::Result as DataFusionResult;
29+
use std::fmt::Display;
2930
use std::sync::Arc;
31+
use std::{env, fmt};
32+
33+
pub struct Config {
34+
pub dbt_serialization_format: SerializationFormat,
35+
}
36+
37+
impl Default for Config {
38+
fn default() -> Self {
39+
Self {
40+
dbt_serialization_format: SerializationFormat::new(),
41+
}
42+
}
43+
}
44+
#[derive(Copy, Clone, PartialEq, Eq)]
45+
pub enum SerializationFormat {
46+
Arrow,
47+
Json,
48+
}
49+
50+
impl Display for SerializationFormat {
51+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52+
match self {
53+
Self::Arrow => write!(f, "arrow"),
54+
Self::Json => write!(f, "json"),
55+
}
56+
}
57+
}
58+
59+
impl SerializationFormat {
60+
fn new() -> Self {
61+
let var = env::var("DBT_SERIALIZATION_FORMAT").unwrap_or_else(|_| "json".to_string());
62+
match var.to_lowercase().as_str() {
63+
"arrow" => Self::Arrow,
64+
_ => Self::Json,
65+
}
66+
}
67+
}
3068

3169
#[must_use]
3270
pub fn first_non_empty_type(union_array: &UnionArray) -> Option<(DataType, ArrayRef)> {
@@ -42,6 +80,7 @@ pub fn first_non_empty_type(union_array: &UnionArray) -> Option<(DataType, Array
4280

4381
pub fn convert_record_batches(
4482
records: Vec<RecordBatch>,
83+
serialization_format: SerializationFormat,
4584
) -> DataFusionResult<(Vec<RecordBatch>, Vec<ColumnInfo>)> {
4685
let mut converted_batches = Vec::new();
4786
let column_infos = ColumnInfo::from_batch(&records);
@@ -71,7 +110,8 @@ pub fn convert_record_batches(
71110
}
72111
}
73112
DataType::Timestamp(unit, _) => {
74-
let converted_column = convert_timestamp_to_struct(column, *unit);
113+
let converted_column =
114+
convert_timestamp_to_struct(column, *unit, serialization_format);
75115
fields.push(
76116
Field::new(
77117
field.name(),
@@ -97,63 +137,80 @@ pub fn convert_record_batches(
97137
Ok((converted_batches.clone(), column_infos))
98138
}
99139

140+
macro_rules! downcast_and_iter {
141+
($column:expr, $array_type:ty) => {
142+
$column
143+
.as_any()
144+
.downcast_ref::<$array_type>()
145+
.unwrap()
146+
.into_iter()
147+
};
148+
}
149+
100150
#[allow(
101151
clippy::unwrap_used,
102152
clippy::as_conversions,
103153
clippy::cast_possible_truncation
104154
)]
105-
fn convert_timestamp_to_struct(column: &ArrayRef, unit: TimeUnit) -> ArrayRef {
106-
let timestamps: Vec<_> = match unit {
107-
TimeUnit::Second => column
108-
.as_any()
109-
.downcast_ref::<TimestampSecondArray>()
110-
.unwrap()
111-
.iter()
112-
.map(|x| {
113-
x.map(|ts| {
114-
let ts = DateTime::from_timestamp(ts, 0).unwrap();
115-
format!("{}", ts.timestamp())
116-
})
117-
})
118-
.collect(),
119-
TimeUnit::Millisecond => column
120-
.as_any()
121-
.downcast_ref::<TimestampMillisecondArray>()
122-
.unwrap()
123-
.iter()
124-
.map(|x| {
125-
x.map(|ts| {
126-
let ts = DateTime::from_timestamp_millis(ts).unwrap();
127-
format!("{}.{}", ts.timestamp(), ts.timestamp_subsec_millis())
128-
})
129-
})
130-
.collect(),
131-
TimeUnit::Microsecond => column
132-
.as_any()
133-
.downcast_ref::<TimestampMicrosecondArray>()
134-
.unwrap()
135-
.iter()
136-
.map(|x| {
137-
x.map(|ts| {
138-
let ts = DateTime::from_timestamp_micros(ts).unwrap();
139-
format!("{}.{}", ts.timestamp(), ts.timestamp_subsec_micros())
140-
})
141-
})
142-
.collect(),
143-
TimeUnit::Nanosecond => column
144-
.as_any()
145-
.downcast_ref::<TimestampNanosecondArray>()
146-
.unwrap()
147-
.iter()
148-
.map(|x| {
149-
x.map(|ts| {
150-
let ts = DateTime::from_timestamp_nanos(ts);
151-
format!("{}.{}", ts.timestamp(), ts.timestamp_subsec_nanos())
152-
})
153-
})
154-
.collect(),
155-
};
156-
Arc::new(StringArray::from(timestamps)) as ArrayRef
155+
fn convert_timestamp_to_struct(
156+
column: &ArrayRef,
157+
unit: TimeUnit,
158+
ser: SerializationFormat,
159+
) -> ArrayRef {
160+
match ser {
161+
SerializationFormat::Arrow => {
162+
let timestamps: Vec<_> = match unit {
163+
TimeUnit::Second => downcast_and_iter!(column, TimestampSecondArray).collect(),
164+
TimeUnit::Millisecond => {
165+
downcast_and_iter!(column, TimestampMillisecondArray).collect()
166+
}
167+
TimeUnit::Microsecond => {
168+
downcast_and_iter!(column, TimestampMicrosecondArray).collect()
169+
}
170+
TimeUnit::Nanosecond => {
171+
downcast_and_iter!(column, TimestampNanosecondArray).collect()
172+
}
173+
};
174+
Arc::new(Int64Array::from(timestamps)) as ArrayRef
175+
}
176+
SerializationFormat::Json => {
177+
let timestamps: Vec<_> = match unit {
178+
TimeUnit::Second => downcast_and_iter!(column, TimestampSecondArray)
179+
.map(|x| {
180+
x.map(|ts| {
181+
let ts = DateTime::from_timestamp(ts, 0).unwrap();
182+
format!("{}", ts.timestamp())
183+
})
184+
})
185+
.collect(),
186+
TimeUnit::Millisecond => downcast_and_iter!(column, TimestampMillisecondArray)
187+
.map(|x| {
188+
x.map(|ts| {
189+
let ts = DateTime::from_timestamp_millis(ts).unwrap();
190+
format!("{}.{}", ts.timestamp(), ts.timestamp_subsec_millis())
191+
})
192+
})
193+
.collect(),
194+
TimeUnit::Microsecond => downcast_and_iter!(column, TimestampMicrosecondArray)
195+
.map(|x| {
196+
x.map(|ts| {
197+
let ts = DateTime::from_timestamp_micros(ts).unwrap();
198+
format!("{}.{}", ts.timestamp(), ts.timestamp_subsec_micros())
199+
})
200+
})
201+
.collect(),
202+
TimeUnit::Nanosecond => downcast_and_iter!(column, TimestampNanosecondArray)
203+
.map(|x| {
204+
x.map(|ts| {
205+
let ts = DateTime::from_timestamp_nanos(ts);
206+
format!("{}.{}", ts.timestamp(), ts.timestamp_subsec_nanos())
207+
})
208+
})
209+
.collect(),
210+
};
211+
Arc::new(StringArray::from(timestamps)) as ArrayRef
212+
}
213+
}
157214
}
158215

159216
#[cfg(test)]
@@ -224,7 +281,8 @@ mod tests {
224281
Arc::new(TimestampNanosecondArray::from(values)) as ArrayRef
225282
}
226283
};
227-
let result = convert_timestamp_to_struct(&timestamp_array, *unit);
284+
let result =
285+
convert_timestamp_to_struct(&timestamp_array, *unit, SerializationFormat::Json);
228286
let string_array = result.as_any().downcast_ref::<StringArray>().unwrap();
229287
assert_eq!(string_array.len(), 2);
230288
assert_eq!(string_array.value(0), *expected);
@@ -250,7 +308,8 @@ mod tests {
250308
])) as ArrayRef;
251309
let batch = RecordBatch::try_new(schema, vec![int_array, timestamp_array]).unwrap();
252310
let records = vec![batch];
253-
let (converted_batches, column_infos) = convert_record_batches(records).unwrap();
311+
let (converted_batches, column_infos) =
312+
convert_record_batches(records.clone(), SerializationFormat::Json).unwrap();
254313

255314
let converted_batch = &converted_batches[0];
256315
assert_eq!(converted_batches.len(), 1);
@@ -270,5 +329,17 @@ mod tests {
270329
assert_eq!(column_infos[0].r#type, "fixed");
271330
assert_eq!(column_infos[1].name, "timestamp_col");
272331
assert_eq!(column_infos[1].r#type, "timestamp_ntz");
332+
333+
let (converted_batches, _) =
334+
convert_record_batches(records, SerializationFormat::Arrow).unwrap();
335+
let converted_batch = &converted_batches[0];
336+
let converted_timestamp_array = converted_batch
337+
.column(1)
338+
.as_any()
339+
.downcast_ref::<Int64Array>()
340+
.unwrap();
341+
assert_eq!(converted_timestamp_array.value(0), 1_627_846_261);
342+
assert!(converted_timestamp_array.is_null(1));
343+
assert_eq!(converted_timestamp_array.value(2), 1_627_846_262);
273344
}
274345
}

0 commit comments

Comments
 (0)