1717
1818//! Metadata table api.
1919
20+ use std:: collections:: HashMap ;
2021use std:: sync:: Arc ;
2122
2223use arrow_array:: builder:: {
@@ -26,6 +27,7 @@ use arrow_array::types::{Int32Type, Int64Type, Int8Type, TimestampMillisecondTyp
2627use arrow_array:: RecordBatch ;
2728use arrow_schema:: { DataType , Field , Fields , Schema , TimeUnit } ;
2829
30+ use crate :: arrow:: { arrow_schema_to_schema, schema_to_arrow_schema} ;
2931use crate :: spec:: { ListType , NestedField , PrimitiveType , StructType , Type } ;
3032use crate :: table:: Table ;
3133use crate :: Result ;
@@ -135,101 +137,66 @@ pub struct ManifestsTable<'a> {
135137}
136138
137139impl < ' a > ManifestsTable < ' a > {
138- fn partition_summary_fields ( & self ) -> Vec < Field > {
139- vec ! [
140- Field :: new( "contains_null" , DataType :: Boolean , false ) ,
141- Field :: new( "contains_nan" , DataType :: Boolean , true ) ,
142- Field :: new( "lower_bound" , DataType :: Utf8 , true ) ,
143- Field :: new( "upper_bound" , DataType :: Utf8 , true ) ,
144- ]
145- }
146-
147- /// Returns the fields of the manifests table.
148- fn fields ( & self ) -> Vec < Field > {
149- vec ! [
150- Field :: new( "content" , DataType :: Int8 , false ) ,
151- Field :: new( "path" , DataType :: Utf8 , false ) ,
152- Field :: new( "length" , DataType :: Int64 , false ) ,
153- Field :: new( "partition_spec_id" , DataType :: Int32 , false ) ,
154- Field :: new( "added_snapshot_id" , DataType :: Int64 , false ) ,
155- Field :: new( "added_data_files_count" , DataType :: Int32 , false ) ,
156- Field :: new( "existing_data_files_count" , DataType :: Int32 , false ) ,
157- Field :: new( "deleted_data_files_count" , DataType :: Int32 , false ) ,
158- Field :: new( "added_delete_files_count" , DataType :: Int32 , false ) ,
159- Field :: new( "existing_delete_files_count" , DataType :: Int32 , false ) ,
160- Field :: new( "deleted_delete_files_count" , DataType :: Int32 , false ) ,
161- Field :: new(
162- "partition_summaries" ,
163- DataType :: List ( Arc :: new( Field :: new_struct(
164- "item" ,
165- self . partition_summary_fields( ) ,
166- false ,
167- ) ) ) ,
168- false ,
169- ) ,
170- ]
171- }
172-
173140 /// Returns the iceberg schema of the manifests table.
174141 pub fn schema ( & self ) -> crate :: spec:: Schema {
175142 let fields = vec ! [
176- NestedField :: new( 14 , "content" , Type :: Primitive ( PrimitiveType :: Int ) , false ) ,
177- NestedField :: new( 1 , "path" , Type :: Primitive ( PrimitiveType :: String ) , false ) ,
178- NestedField :: new( 2 , "length" , Type :: Primitive ( PrimitiveType :: Long ) , false ) ,
143+ NestedField :: new( 14 , "content" , Type :: Primitive ( PrimitiveType :: Int ) , true ) ,
144+ NestedField :: new( 1 , "path" , Type :: Primitive ( PrimitiveType :: String ) , true ) ,
145+ NestedField :: new( 2 , "length" , Type :: Primitive ( PrimitiveType :: Long ) , true ) ,
179146 NestedField :: new(
180147 3 ,
181148 "partition_spec_id" ,
182149 Type :: Primitive ( PrimitiveType :: Int ) ,
183- false ,
150+ true ,
184151 ) ,
185152 NestedField :: new(
186153 4 ,
187154 "added_snapshot_id" ,
188155 Type :: Primitive ( PrimitiveType :: Long ) ,
189- false ,
156+ true ,
190157 ) ,
191158 NestedField :: new(
192159 5 ,
193160 "added_data_files_count" ,
194161 Type :: Primitive ( PrimitiveType :: Int ) ,
195- false ,
162+ true ,
196163 ) ,
197164 NestedField :: new(
198165 6 ,
199166 "existing_data_files_count" ,
200167 Type :: Primitive ( PrimitiveType :: Int ) ,
201- false ,
168+ true ,
202169 ) ,
203170 NestedField :: new(
204171 7 ,
205172 "deleted_data_files_count" ,
206173 Type :: Primitive ( PrimitiveType :: Int ) ,
207- false ,
174+ true ,
208175 ) ,
209176 NestedField :: new(
210177 15 ,
211178 "added_delete_files_count" ,
212179 Type :: Primitive ( PrimitiveType :: Int ) ,
213- false ,
180+ true ,
214181 ) ,
215182 NestedField :: new(
216183 16 ,
217184 "existing_delete_files_count" ,
218185 Type :: Primitive ( PrimitiveType :: Int ) ,
219- false ,
186+ true ,
220187 ) ,
221188 NestedField :: new(
222189 17 ,
223190 "deleted_delete_files_count" ,
224191 Type :: Primitive ( PrimitiveType :: Int ) ,
225- false ,
192+ true ,
226193 ) ,
227194 NestedField :: new(
228195 8 ,
229196 "partition_summaries" ,
230197 Type :: List ( ListType {
231198 element_field: Arc :: new( NestedField :: new(
232- 0 ,
199+ 9 ,
233200 "item" ,
234201 Type :: Struct ( StructType :: new( vec![
235202 Arc :: new( NestedField :: new(
@@ -257,10 +224,10 @@ impl<'a> ManifestsTable<'a> {
257224 false ,
258225 ) ) ,
259226 ] ) ) ,
260- false ,
227+ true ,
261228 ) ) ,
262229 } ) ,
263- false ,
230+ true ,
264231 ) ,
265232 ] ;
266233
@@ -272,7 +239,20 @@ impl<'a> ManifestsTable<'a> {
272239
273240 /// Scans the manifests table.
274241 pub async fn scan ( & self ) -> Result < RecordBatch > {
275- let mut content = PrimitiveBuilder :: < Int8Type > :: new ( ) ;
242+ let schema = schema_to_arrow_schema ( & self . schema ( ) ) ?;
243+ let partition_summary_fields = if let DataType :: List ( list_type) =
244+ schema. field_with_name ( "partition_summaries" ) ?. data_type ( )
245+ {
246+ if let DataType :: Struct ( fields) = list_type. data_type ( ) {
247+ fields. to_vec ( )
248+ } else {
249+ unreachable ! ( )
250+ }
251+ } else {
252+ unreachable ! ( )
253+ } ;
254+
255+ let mut content = PrimitiveBuilder :: < Int32Type > :: new ( ) ;
276256 let mut path = StringBuilder :: new ( ) ;
277257 let mut length = PrimitiveBuilder :: < Int64Type > :: new ( ) ;
278258 let mut partition_spec_id = PrimitiveBuilder :: < Int32Type > :: new ( ) ;
@@ -284,21 +264,21 @@ impl<'a> ManifestsTable<'a> {
284264 let mut existing_delete_files_count = PrimitiveBuilder :: < Int32Type > :: new ( ) ;
285265 let mut deleted_delete_files_count = PrimitiveBuilder :: < Int32Type > :: new ( ) ;
286266 let mut partition_summaries = ListBuilder :: new ( StructBuilder :: from_fields (
287- Fields :: from ( self . partition_summary_fields ( ) ) ,
267+ Fields :: from ( partition_summary_fields. clone ( ) ) ,
288268 0 ,
289269 ) )
290- . with_field ( Arc :: new ( Field :: new_struct (
291- "item" ,
292- self . partition_summary_fields ( ) ,
293- false ,
294- ) ) ) ;
270+ . with_field ( Arc :: new (
271+ Field :: new_struct ( "item" , partition_summary_fields , false ) . with_metadata (
272+ HashMap :: from ( [ ( "PARQUET:field_id" . to_string ( ) , "9" . to_string ( ) ) ] ) ,
273+ ) ,
274+ ) ) ;
295275
296276 if let Some ( snapshot) = self . table . metadata ( ) . current_snapshot ( ) {
297277 let manifest_list = snapshot
298278 . load_manifest_list ( self . table . file_io ( ) , & self . table . metadata_ref ( ) )
299279 . await ?;
300280 for manifest in manifest_list. entries ( ) {
301- content. append_value ( manifest. content as i8 ) ;
281+ content. append_value ( manifest. content as i32 ) ;
302282 path. append_value ( manifest. manifest_path . clone ( ) ) ;
303283 length. append_value ( manifest. manifest_length ) ;
304284 partition_spec_id. append_value ( manifest. partition_spec_id ) ;
@@ -339,7 +319,6 @@ impl<'a> ManifestsTable<'a> {
339319 }
340320 }
341321
342- let schema = Schema :: new ( self . fields ( ) ) ;
343322 Ok ( RecordBatch :: try_new ( Arc :: new ( schema) , vec ! [
344323 Arc :: new( content. finish( ) ) ,
345324 Arc :: new( path. finish( ) ) ,
@@ -499,20 +478,20 @@ mod tests {
499478 check_record_batch (
500479 record_batch,
501480 expect ! [ [ r#"
502- Field { name: "content", data_type: Int8 , nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
503- Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
504- Field { name: "length", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
505- Field { name: "partition_spec_id", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
506- Field { name: "added_snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
507- Field { name: "added_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
508- Field { name: "existing_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
509- Field { name: "deleted_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
510- Field { name: "added_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
511- Field { name: "existing_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
512- Field { name: "deleted_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
513- Field { name: "partition_summaries", data_type: List(Field { name: "item", data_type: Struct([Field { name: "contains_null", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "contains_nan", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"# ] ] ,
481+ Field { name: "content", data_type: Int32 , nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "14" } },
482+ Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1" } },
483+ Field { name: "length", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2" } },
484+ Field { name: "partition_spec_id", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3" } },
485+ Field { name: "added_snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "4" } },
486+ Field { name: "added_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "5" } },
487+ Field { name: "existing_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "6" } },
488+ Field { name: "deleted_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "7" } },
489+ Field { name: "added_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "15" } },
490+ Field { name: "existing_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "16" } },
491+ Field { name: "deleted_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "17" } },
492+ Field { name: "partition_summaries", data_type: List(Field { name: "item", data_type: Struct([Field { name: "contains_null", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "10" } }, Field { name: "contains_nan", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "11" } }, Field { name: "lower_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "12" } }, Field { name: "upper_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "13" } }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "9" } }), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "8" } }"# ] ] ,
514493 expect ! [ [ r#"
515- content: PrimitiveArray<Int8 >
494+ content: PrimitiveArray<Int32 >
516495 [
517496 0,
518497 ],
0 commit comments