diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 391239cf6e..c10264bcd6 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -125,7 +125,7 @@ impl ArrowReader { Ok(try_stream! { while let Some(Ok(task)) = tasks.next().await { let parquet_file = file_io - .new_input(task.data().data_file().file_path())?; + .new_input(task.data_file_path())?; let (parquet_metadata, parquet_reader) = try_join!(parquet_file.metadata(), parquet_file.reader())?; let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader); diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 17a94d4126..b7e0b3be08 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -45,7 +45,7 @@ mod avro; pub mod io; pub mod spec; -mod scan; +pub mod scan; #[allow(dead_code)] pub mod expr; diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 6641d6775d..397633d5e9 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -24,8 +24,8 @@ use crate::expr::visitors::manifest_evaluator::ManifestEvaluator; use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; use crate::spec::{ - DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile, Schema, SchemaRef, - SnapshotRef, TableMetadataRef, + DataContentType, ManifestContentType, ManifestFile, Schema, SchemaRef, SnapshotRef, + TableMetadataRef, }; use crate::table::Table; use crate::{Error, ErrorKind, Result}; @@ -33,6 +33,7 @@ use arrow_array::RecordBatch; use async_stream::try_stream; use futures::stream::BoxStream; use futures::StreamExt; +use serde::{Deserialize, Serialize}; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::Arc; @@ -55,7 +56,7 @@ pub struct TableScanBuilder<'a> { } impl<'a> TableScanBuilder<'a> { - pub fn new(table: &'a Table) -> Self { + pub(crate) fn new(table: &'a Table) -> Self { Self { table, column_names: vec![], @@ -265,7 +266,7 @@ impl TableScan { } DataContentType::Data => { let scan_task: Result = Ok(FileScanTask { - data_manifest_entry: manifest_entry.clone(), + data_file_path: manifest_entry.data_file().file_path().to_string(), start: 0, length: manifest_entry.file_size_in_bytes(), }); @@ -463,9 +464,9 @@ impl ManifestEvaluatorCache { } /// A task to scan part of file. -#[derive(Debug)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct FileScanTask { - data_manifest_entry: ManifestEntryRef, + data_file_path: String, #[allow(dead_code)] start: u64, #[allow(dead_code)] @@ -473,8 +474,9 @@ pub struct FileScanTask { } impl FileScanTask { - pub fn data(&self) -> ManifestEntryRef { - self.data_manifest_entry.clone() + /// Returns the data file path of this file scan task. + pub fn data_file_path(&self) -> &str { + &self.data_file_path } } @@ -794,17 +796,17 @@ mod tests { assert_eq!(tasks.len(), 2); - tasks.sort_by_key(|t| t.data().data_file().file_path().to_string()); + tasks.sort_by_key(|t| t.data_file_path().to_string()); // Check first task is added data file assert_eq!( - tasks[0].data().data_file().file_path(), + tasks[0].data_file_path(), format!("{}/1.parquet", &fixture.table_location) ); // Check second task is existing data file assert_eq!( - tasks[1].data().data_file().file_path(), + tasks[1].data_file_path(), format!("{}/3.parquet", &fixture.table_location) ); }