Skip to content

Commit ab301c5

Browse files
authored
Merge pull request #2 from feniljain/wip-nan-val-counts
Wip nan val counts
2 parents 3ccba94 + 332a795 commit ab301c5

File tree

4 files changed

+1210
-1297
lines changed

4 files changed

+1210
-1297
lines changed

crates/iceberg/src/arrow/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
2020
mod schema;
2121
pub use schema::*;
22+
mod nan_val_cnt_visitor;
23+
pub use nan_val_cnt_visitor::*;
2224
mod reader;
2325
pub(crate) mod record_batch_projector;
2426
pub(crate) mod record_batch_transformer;
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
use std::collections::hash_map::Entry;
2+
use std::collections::HashMap;
3+
use std::sync::Arc;
4+
5+
use arrow_array::{ArrayRef, Float32Array, Float64Array, RecordBatch, StructArray};
6+
use arrow_schema::DataType;
7+
8+
use crate::arrow::ArrowArrayAccessor;
9+
use crate::spec::{
10+
visit_struct_with_partner, ListType, MapType, NestedFieldRef, PrimitiveType, Schema, SchemaRef,
11+
SchemaWithPartnerVisitor, StructType,
12+
};
13+
use crate::Result;
14+
15+
macro_rules! cast_and_update_cnt_map {
16+
($t:ty, $col:ident, $self:ident, $field_id:ident) => {
17+
let nan_val_cnt = $col
18+
.as_any()
19+
.downcast_ref::<$t>()
20+
.unwrap()
21+
.iter()
22+
.filter(|value| value.map_or(false, |v| v.is_nan()))
23+
.count() as u64;
24+
25+
match $self.nan_value_counts.entry($field_id) {
26+
Entry::Occupied(mut ele) => {
27+
let total_nan_val_cnt = ele.get() + nan_val_cnt;
28+
ele.insert(total_nan_val_cnt);
29+
}
30+
Entry::Vacant(v) => {
31+
v.insert(nan_val_cnt);
32+
}
33+
};
34+
};
35+
}
36+
37+
macro_rules! count_float_nans {
38+
($col:ident, $self:ident, $field_id:ident) => {
39+
match $col.data_type() {
40+
DataType::Float32 => {
41+
cast_and_update_cnt_map!(Float32Array, $col, $self, $field_id);
42+
}
43+
DataType::Float64 => {
44+
cast_and_update_cnt_map!(Float64Array, $col, $self, $field_id);
45+
}
46+
_ => {}
47+
}
48+
};
49+
}
50+
51+
/// Visitor which counts and keeps track of NaN value counts in given record batch(s)
52+
pub struct NanValueCountVisitor {
53+
/// Stores field ID to NaN value count mapping
54+
pub nan_value_counts: HashMap<i32, u64>,
55+
}
56+
57+
impl SchemaWithPartnerVisitor<ArrayRef> for NanValueCountVisitor {
58+
type T = ();
59+
60+
fn schema(
61+
&mut self,
62+
_schema: &Schema,
63+
_partner: &ArrayRef,
64+
_value: Self::T,
65+
) -> Result<Self::T> {
66+
Ok(())
67+
}
68+
69+
fn field(
70+
&mut self,
71+
_field: &NestedFieldRef,
72+
_partner: &ArrayRef,
73+
_value: Self::T,
74+
) -> Result<Self::T> {
75+
Ok(())
76+
}
77+
78+
fn r#struct(
79+
&mut self,
80+
_struct: &StructType,
81+
_partner: &ArrayRef,
82+
_results: Vec<Self::T>,
83+
) -> Result<Self::T> {
84+
Ok(())
85+
}
86+
87+
fn list(&mut self, _list: &ListType, _list_arr: &ArrayRef, _value: Self::T) -> Result<Self::T> {
88+
Ok(())
89+
}
90+
91+
fn map(
92+
&mut self,
93+
_map: &MapType,
94+
_partner: &ArrayRef,
95+
_key_value: Self::T,
96+
_value: Self::T,
97+
) -> Result<Self::T> {
98+
Ok(())
99+
}
100+
101+
fn primitive(&mut self, _p: &PrimitiveType, _col: &ArrayRef) -> Result<Self::T> {
102+
Ok(())
103+
}
104+
105+
fn after_struct_field(&mut self, field: &NestedFieldRef, partner: &ArrayRef) -> Result<()> {
106+
let field_id = field.id;
107+
count_float_nans!(partner, self, field_id);
108+
Ok(())
109+
}
110+
111+
fn after_list_element(&mut self, field: &NestedFieldRef, partner: &ArrayRef) -> Result<()> {
112+
let field_id = field.id;
113+
count_float_nans!(partner, self, field_id);
114+
Ok(())
115+
}
116+
117+
fn after_map_key(&mut self, field: &NestedFieldRef, partner: &ArrayRef) -> Result<()> {
118+
let field_id = field.id;
119+
count_float_nans!(partner, self, field_id);
120+
Ok(())
121+
}
122+
123+
fn after_map_value(&mut self, field: &NestedFieldRef, partner: &ArrayRef) -> Result<()> {
124+
let field_id = field.id;
125+
count_float_nans!(partner, self, field_id);
126+
Ok(())
127+
}
128+
}
129+
130+
impl NanValueCountVisitor {
131+
/// Creates new instance of NanValueCountVisitor
132+
pub fn new() -> Self {
133+
Self {
134+
nan_value_counts: HashMap::new(),
135+
}
136+
}
137+
138+
/// Compute nan value counts in given schema and record batch
139+
pub fn compute(&mut self, schema: SchemaRef, batch: RecordBatch) -> Result<()> {
140+
let arrow_arr_partner_accessor = ArrowArrayAccessor {};
141+
142+
let struct_arr = Arc::new(StructArray::from(batch)) as ArrayRef;
143+
visit_struct_with_partner(
144+
schema.as_struct(),
145+
&struct_arr,
146+
self,
147+
&arrow_arr_partner_accessor,
148+
)?;
149+
150+
Ok(())
151+
}
152+
}
153+
154+
impl Default for NanValueCountVisitor {
155+
fn default() -> Self {
156+
Self::new()
157+
}
158+
}

crates/iceberg/src/arrow/value.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,8 @@ impl SchemaWithPartnerVisitor<ArrayRef> for ArrowArrayToIcebergStructConverter {
425425
}
426426
}
427427

428-
struct ArrowArrayAccessor;
428+
/// Partner type representing accessing and walking arrow arrays alongside iceberg schema
429+
pub struct ArrowArrayAccessor;
429430

430431
impl PartnerAccessor<ArrayRef> for ArrowArrayAccessor {
431432
fn struct_parner<'a>(&self, schema_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {

0 commit comments

Comments
 (0)