@@ -36,8 +36,8 @@ use crate::io::object_cache::ObjectCache;
3636use crate :: io:: FileIO ;
3737use crate :: runtime:: spawn;
3838use crate :: spec:: {
39- DataContentType , ManifestContentType , ManifestEntryRef , ManifestFile , ManifestList , Schema ,
40- SchemaRef , SnapshotRef , TableMetadataRef ,
39+ DataContentType , DataFileFormat , ManifestContentType , ManifestEntryRef , ManifestFile ,
40+ ManifestList , Schema , SchemaRef , SnapshotRef , TableMetadataRef ,
4141} ;
4242use crate :: table:: Table ;
4343use crate :: utils:: available_parallelism;
@@ -529,14 +529,19 @@ impl ManifestEntryContext {
529529 /// created from it
530530 fn into_file_scan_task ( self ) -> FileScanTask {
531531 FileScanTask {
532- data_file_path : self . manifest_entry . file_path ( ) . to_string ( ) ,
533532 start : 0 ,
534533 length : self . manifest_entry . file_size_in_bytes ( ) ,
534+ record_count : Some ( self . manifest_entry . record_count ( ) ) ,
535+
536+ data_file_path : self . manifest_entry . file_path ( ) . to_string ( ) ,
537+ data_file_content : self . manifest_entry . content_type ( ) ,
538+ data_file_format : self . manifest_entry . file_format ( ) ,
539+
540+ schema : self . snapshot_schema ,
535541 project_field_ids : self . field_ids . to_vec ( ) ,
536542 predicate : self
537543 . bound_predicates
538544 . map ( |x| x. as_ref ( ) . snapshot_bound_predicate . clone ( ) ) ,
539- schema : self . snapshot_schema ,
540545 }
541546 }
542547}
@@ -854,35 +859,30 @@ impl ExpressionEvaluatorCache {
854859/// A task to scan part of file.
855860#[ derive( Debug , Clone , Serialize , Deserialize ) ]
856861pub struct FileScanTask {
857- data_file_path : String ,
858- start : u64 ,
859- length : u64 ,
860- project_field_ids : Vec < i32 > ,
862+ /// The start offset of the file to scan.
863+ pub start : u64 ,
864+ /// The length of the file to scan.
865+ pub length : u64 ,
866+ /// The number of records in the file to scan.
867+ ///
868+ /// This is an optional field, and only available if we are
869+ /// reading the entire data file.
870+ pub record_count : Option < u64 > ,
871+
872+ /// The data file path corresponding to the task.
873+ pub data_file_path : String ,
874+ /// The content type of the file to scan.
875+ pub data_file_content : DataContentType ,
876+ /// The format of the file to scan.
877+ pub data_file_format : DataFileFormat ,
878+
879+ /// The schema of the file to scan.
880+ pub schema : SchemaRef ,
881+ /// The field ids to project.
882+ pub project_field_ids : Vec < i32 > ,
883+ /// The predicate to filter.
861884 #[ serde( skip_serializing_if = "Option::is_none" ) ]
862- predicate : Option < BoundPredicate > ,
863- schema : SchemaRef ,
864- }
865-
866- impl FileScanTask {
867- /// Returns the data file path of this file scan task.
868- pub fn data_file_path ( & self ) -> & str {
869- & self . data_file_path
870- }
871-
872- /// Returns the project field id of this file scan task.
873- pub fn project_field_ids ( & self ) -> & [ i32 ] {
874- & self . project_field_ids
875- }
876-
877- /// Returns the predicate of this file scan task.
878- pub fn predicate ( & self ) -> Option < & BoundPredicate > {
879- self . predicate . as_ref ( )
880- }
881-
882- /// Returns the schema id of this file scan task.
883- pub fn schema ( & self ) -> & Schema {
884- & self . schema
885- }
885+ pub predicate : Option < BoundPredicate > ,
886886}
887887
888888#[ cfg( test) ]
@@ -1219,17 +1219,17 @@ mod tests {
12191219
12201220 assert_eq ! ( tasks. len( ) , 2 ) ;
12211221
1222- tasks. sort_by_key ( |t| t. data_file_path ( ) . to_string ( ) ) ;
1222+ tasks. sort_by_key ( |t| t. data_file_path . to_string ( ) ) ;
12231223
12241224 // Check first task is added data file
12251225 assert_eq ! (
1226- tasks[ 0 ] . data_file_path( ) ,
1226+ tasks[ 0 ] . data_file_path,
12271227 format!( "{}/1.parquet" , & fixture. table_location)
12281228 ) ;
12291229
12301230 // Check second task is existing data file
12311231 assert_eq ! (
1232- tasks[ 1 ] . data_file_path( ) ,
1232+ tasks[ 1 ] . data_file_path,
12331233 format!( "{}/3.parquet" , & fixture. table_location)
12341234 ) ;
12351235 }
@@ -1582,22 +1582,28 @@ mod tests {
15821582 ) ;
15831583 let task = FileScanTask {
15841584 data_file_path : "data_file_path" . to_string ( ) ,
1585+ data_file_content : DataContentType :: Data ,
15851586 start : 0 ,
15861587 length : 100 ,
15871588 project_field_ids : vec ! [ 1 , 2 , 3 ] ,
15881589 predicate : None ,
15891590 schema : schema. clone ( ) ,
1591+ record_count : Some ( 100 ) ,
1592+ data_file_format : DataFileFormat :: Parquet ,
15901593 } ;
15911594 test_fn ( task) ;
15921595
15931596 // with predicate
15941597 let task = FileScanTask {
15951598 data_file_path : "data_file_path" . to_string ( ) ,
1599+ data_file_content : DataContentType :: Data ,
15961600 start : 0 ,
15971601 length : 100 ,
15981602 project_field_ids : vec ! [ 1 , 2 , 3 ] ,
15991603 predicate : Some ( BoundPredicate :: AlwaysTrue ) ,
16001604 schema,
1605+ record_count : None ,
1606+ data_file_format : DataFileFormat :: Avro ,
16011607 } ;
16021608 test_fn ( task) ;
16031609 }
0 commit comments