diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index f1cb86ab38..3c9dbc1cab 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -36,8 +36,8 @@ use crate::io::object_cache::ObjectCache; use crate::io::FileIO; use crate::runtime::spawn; use crate::spec::{ - DataContentType, DataFileFormat, ManifestContentType, ManifestEntryRef, ManifestFile, - ManifestList, Schema, SchemaRef, SnapshotRef, TableMetadataRef, + DataContentType, DataFileFormat, ManifestEntryRef, ManifestFile, ManifestList, Schema, + SchemaRef, SnapshotRef, TableMetadataRef, }; use crate::table::Table; use crate::utils::available_parallelism; @@ -405,15 +405,6 @@ impl TableScan { return Ok(()); } - // abort the plan if we encounter a manifest entry whose data file's - // content type is currently unsupported - if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data { - return Err(Error::new( - ErrorKind::FeatureUnsupported, - "Only Data files currently supported", - )); - } - if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates { let BoundPredicates { ref snapshot_bound_predicate, @@ -542,6 +533,8 @@ impl ManifestEntryContext { predicate: self .bound_predicates .map(|x| x.as_ref().snapshot_bound_predicate.clone()), + sequence_number: self.manifest_entry.sequence_number().unwrap_or(0), + equality_ids: self.manifest_entry.data_file().equality_ids().to_vec(), } } } @@ -580,15 +573,10 @@ impl PlanContext { manifest_list: Arc, sender: Sender, ) -> Result>>> { - let filtered_entries = manifest_list - .entries() - .iter() - .filter(|manifest_file| manifest_file.content == ManifestContentType::Data); - // TODO: Ideally we could ditch this intermediate Vec as we return an iterator. let mut filtered_mfcs = vec![]; if self.predicate.is_some() { - for manifest_file in filtered_entries { + for manifest_file in manifest_list.entries().iter() { let partition_bound_predicate = self.get_partition_filter(manifest_file)?; // evaluate the ManifestFile against the partition filter. Skip @@ -610,7 +598,7 @@ impl PlanContext { } } } else { - for manifest_file in filtered_entries { + for manifest_file in manifest_list.entries().iter() { let mfc = self.create_manifest_file_context(manifest_file, None, sender.clone()); filtered_mfcs.push(Ok(mfc)); } @@ -883,6 +871,10 @@ pub struct FileScanTask { /// The predicate to filter. #[serde(skip_serializing_if = "Option::is_none")] pub predicate: Option, + /// The `sequence_number` of the task. + pub sequence_number: i64, + /// The `equality_ids` of the task. + pub equality_ids: Vec, } #[cfg(test)] @@ -1590,6 +1582,8 @@ mod tests { schema: schema.clone(), record_count: Some(100), data_file_format: DataFileFormat::Parquet, + sequence_number: 0, + equality_ids: vec![], }; test_fn(task); @@ -1604,6 +1598,8 @@ mod tests { schema, record_count: None, data_file_format: DataFileFormat::Avro, + sequence_number: 0, + equality_ids: vec![], }; test_fn(task); }