Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 14 additions & 18 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ use crate::io::object_cache::ObjectCache;
use crate::io::FileIO;
use crate::runtime::spawn;
use crate::spec::{
DataContentType, DataFileFormat, ManifestContentType, ManifestEntryRef, ManifestFile,
ManifestList, Schema, SchemaRef, SnapshotRef, TableMetadataRef,
DataContentType, DataFileFormat, ManifestEntryRef, ManifestFile, ManifestList, Schema,
SchemaRef, SnapshotRef, TableMetadataRef,
};
use crate::table::Table;
use crate::utils::available_parallelism;
Expand Down Expand Up @@ -405,15 +405,6 @@ impl TableScan {
return Ok(());
}

// abort the plan if we encounter a manifest entry whose data file's
// content type is currently unsupported
if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data {
return Err(Error::new(
ErrorKind::FeatureUnsupported,
"Only Data files currently supported",
));
}

if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
let BoundPredicates {
ref snapshot_bound_predicate,
Expand Down Expand Up @@ -542,6 +533,8 @@ impl ManifestEntryContext {
predicate: self
.bound_predicates
.map(|x| x.as_ref().snapshot_bound_predicate.clone()),
sequence_number: self.manifest_entry.sequence_number().unwrap_or(0),
equality_ids: self.manifest_entry.data_file().equality_ids().to_vec(),
}
}
}
Expand Down Expand Up @@ -580,15 +573,10 @@ impl PlanContext {
manifest_list: Arc<ManifestList>,
sender: Sender<ManifestEntryContext>,
) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>>>> {
let filtered_entries = manifest_list
.entries()
.iter()
.filter(|manifest_file| manifest_file.content == ManifestContentType::Data);

// TODO: Ideally we could ditch this intermediate Vec as we return an iterator.
let mut filtered_mfcs = vec![];
if self.predicate.is_some() {
for manifest_file in filtered_entries {
for manifest_file in manifest_list.entries().iter() {
let partition_bound_predicate = self.get_partition_filter(manifest_file)?;

// evaluate the ManifestFile against the partition filter. Skip
Expand All @@ -610,7 +598,7 @@ impl PlanContext {
}
}
} else {
for manifest_file in filtered_entries {
for manifest_file in manifest_list.entries().iter() {
let mfc = self.create_manifest_file_context(manifest_file, None, sender.clone());
filtered_mfcs.push(Ok(mfc));
}
Expand Down Expand Up @@ -883,6 +871,10 @@ pub struct FileScanTask {
/// The predicate to filter.
#[serde(skip_serializing_if = "Option::is_none")]
pub predicate: Option<BoundPredicate>,
/// The `sequence_number` of the task.
pub sequence_number: i64,
/// The `equality_ids` of the task.
pub equality_ids: Vec<i32>,
}

#[cfg(test)]
Expand Down Expand Up @@ -1590,6 +1582,8 @@ mod tests {
schema: schema.clone(),
record_count: Some(100),
data_file_format: DataFileFormat::Parquet,
sequence_number: 0,
equality_ids: vec![],
};
test_fn(task);

Expand All @@ -1604,6 +1598,8 @@ mod tests {
schema,
record_count: None,
data_file_format: DataFileFormat::Avro,
sequence_number: 0,
equality_ids: vec![],
};
test_fn(task);
}
Expand Down