From cfd83eda5d5e4b846888e77cd4fe9b9d757f234b Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Tue, 30 Apr 2024 12:58:24 +0200 Subject: [PATCH 1/8] refactor: add partition_schema_cache --- crates/iceberg/src/scan.rs | 98 +++++++++++++++++++++++++------------- 1 file changed, 65 insertions(+), 33 deletions(-) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index b842522e2f..5965c4d942 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -23,8 +23,8 @@ use crate::expr::visitors::manifest_evaluator::ManifestEvaluator; use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; use crate::spec::{ - DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile, PartitionSpecRef, Schema, - SchemaRef, SnapshotRef, TableMetadataRef, + DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile, Schema, SchemaRef, + SnapshotRef, TableMetadata, TableMetadataRef, }; use crate::table::Table; use crate::{Error, ErrorKind, Result}; @@ -189,6 +189,7 @@ impl TableScan { self.case_sensitive, )?; + let mut partition_schema_cache = PartitionSchemaCache::new(); let mut partition_filter_cache = PartitionFilterCache::new(); let mut manifest_evaluator_cache = ManifestEvaluatorCache::new(); @@ -206,12 +207,15 @@ impl TableScan { if let Some(filter) = context.bound_filter() { let partition_spec_id = entry.partition_spec_id; - let (partition_spec, partition_schema) = - context.create_partition_spec_and_schema(partition_spec_id)?; + let partition_schema = partition_schema_cache.get( + partition_spec_id, + &context.table_metadata, + &context.schema + )?; let partition_filter = partition_filter_cache.get( partition_spec_id, - partition_spec, + &context.table_metadata, partition_schema.clone(), filter, context.case_sensitive, @@ -362,32 +366,6 @@ impl FileScanStreamContext { fn bound_filter(&self) -> Option<&BoundPredicate> { self.bound_filter.as_ref() } - - /// Creates a reference-counted [`PartitionSpec`] and a - /// corresponding [`Schema`] based on the specified partition spec id. - fn create_partition_spec_and_schema( - &self, - spec_id: i32, - ) -> Result<(PartitionSpecRef, SchemaRef)> { - let partition_spec = - self.table_metadata - .partition_spec_by_id(spec_id) - .ok_or(Error::new( - ErrorKind::Unexpected, - format!("Could not find partition spec for id {}", spec_id), - ))?; - - let partition_type = partition_spec.partition_type(&self.schema)?; - let partition_fields = partition_type.fields().to_owned(); - let partition_schema = Arc::new( - Schema::builder() - .with_schema_id(partition_spec.spec_id) - .with_fields(partition_fields) - .build()?, - ); - - Ok((partition_spec.clone(), partition_schema)) - } } #[derive(Debug)] @@ -407,7 +385,7 @@ impl PartitionFilterCache { fn get( &mut self, spec_id: i32, - partition_spec: PartitionSpecRef, + table_metadata: &TableMetadata, partition_schema: SchemaRef, filter: &BoundPredicate, case_sensitive: bool, @@ -415,7 +393,15 @@ impl PartitionFilterCache { match self.0.entry(spec_id) { Entry::Occupied(e) => Ok(e.into_mut()), Entry::Vacant(e) => { - let mut inclusive_projection = InclusiveProjection::new(partition_spec); + let partition_spec = + table_metadata + .partition_spec_by_id(spec_id) + .ok_or(Error::new( + ErrorKind::Unexpected, + format!("Could not find partition spec for id {}", spec_id), + ))?; + + let mut inclusive_projection = InclusiveProjection::new(partition_spec.clone()); let partition_filter = inclusive_projection .project(filter)? @@ -428,6 +414,52 @@ impl PartitionFilterCache { } } +#[derive(Debug)] +/// Manages the caching of partition [`Schema`]s +/// for [`PartitionSpec`]s based on partition spec id. +struct PartitionSchemaCache(HashMap); + +impl PartitionSchemaCache { + /// Creates a new [`PartitionSchemaCache`] + /// with an empty internal HashMap. + fn new() -> Self { + Self(HashMap::new()) + } + + /// Retrieves a partition [`SchemaRef`] from the cache + /// or computes it if not present. + fn get( + &mut self, + spec_id: i32, + table_metadata: &TableMetadata, + schema: &Schema, + ) -> Result { + match self.0.entry(spec_id) { + Entry::Occupied(e) => Ok(e.get().clone()), + Entry::Vacant(e) => { + let partition_spec = + table_metadata + .partition_spec_by_id(spec_id) + .ok_or(Error::new( + ErrorKind::Unexpected, + format!("Could not find partition spec for id {}", spec_id), + ))?; + + let partition_type = partition_spec.partition_type(schema)?; + let partition_fields = partition_type.fields().to_owned(); + let partition_schema = Arc::new( + Schema::builder() + .with_schema_id(partition_spec.spec_id) + .with_fields(partition_fields) + .build()?, + ); + + Ok(e.insert(partition_schema).clone()) + } + } + } +} + #[derive(Debug)] /// Manages the caching of [`ManifestEvaluator`] objects /// for [`PartitionSpec`]s based on partition spec id. From 4f60fe039eb8472c3aaff977afa279c6039ad1bc Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Tue, 30 Apr 2024 13:36:45 +0200 Subject: [PATCH 2/8] refactor: use context as param object --- crates/iceberg/src/scan.rs | 30 ++++++++++++------------------ 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 5965c4d942..3c80d16c87 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -24,7 +24,7 @@ use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; use crate::spec::{ DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile, Schema, SchemaRef, - SnapshotRef, TableMetadata, TableMetadataRef, + SnapshotRef, TableMetadataRef, }; use crate::table::Table; use crate::{Error, ErrorKind, Result}; @@ -209,20 +209,18 @@ impl TableScan { let partition_schema = partition_schema_cache.get( partition_spec_id, - &context.table_metadata, - &context.schema + &context )?; let partition_filter = partition_filter_cache.get( partition_spec_id, - &context.table_metadata, + &context, partition_schema.clone(), filter, - context.case_sensitive, )?; let manifest_evaluator = manifest_evaluator_cache.get( - partition_schema.schema_id(), + partition_spec_id, partition_filter.clone(), context.case_sensitive, ); @@ -385,16 +383,16 @@ impl PartitionFilterCache { fn get( &mut self, spec_id: i32, - table_metadata: &TableMetadata, + context: &FileScanStreamContext, partition_schema: SchemaRef, filter: &BoundPredicate, - case_sensitive: bool, ) -> Result<&BoundPredicate> { match self.0.entry(spec_id) { Entry::Occupied(e) => Ok(e.into_mut()), Entry::Vacant(e) => { let partition_spec = - table_metadata + context + .table_metadata .partition_spec_by_id(spec_id) .ok_or(Error::new( ErrorKind::Unexpected, @@ -406,7 +404,7 @@ impl PartitionFilterCache { let partition_filter = inclusive_projection .project(filter)? .rewrite_not() - .bind(partition_schema, case_sensitive)?; + .bind(partition_schema, context.case_sensitive)?; Ok(e.insert(partition_filter)) } @@ -428,24 +426,20 @@ impl PartitionSchemaCache { /// Retrieves a partition [`SchemaRef`] from the cache /// or computes it if not present. - fn get( - &mut self, - spec_id: i32, - table_metadata: &TableMetadata, - schema: &Schema, - ) -> Result { + fn get(&mut self, spec_id: i32, context: &FileScanStreamContext) -> Result { match self.0.entry(spec_id) { Entry::Occupied(e) => Ok(e.get().clone()), Entry::Vacant(e) => { let partition_spec = - table_metadata + context + .table_metadata .partition_spec_by_id(spec_id) .ok_or(Error::new( ErrorKind::Unexpected, format!("Could not find partition spec for id {}", spec_id), ))?; - let partition_type = partition_spec.partition_type(schema)?; + let partition_type = partition_spec.partition_type(context.schema.as_ref())?; let partition_fields = partition_type.fields().to_owned(); let partition_schema = Arc::new( Schema::builder() From f60a8ac512af57f483209f6ceee8793c97843754 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Tue, 30 Apr 2024 14:06:44 +0200 Subject: [PATCH 3/8] fix: test setup --- crates/iceberg/src/expr/visitors/manifest_evaluator.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs index bcb5967186..5c51fade7a 100644 --- a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs @@ -310,7 +310,7 @@ mod test { fn create_partition_schema( partition_spec: &PartitionSpecRef, - schema: &SchemaRef, + schema: &Schema, ) -> Result { let partition_type = partition_spec.partition_type(schema)?; From 00d5750c8d67e90add85aa514f32dc70b110e44a Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Tue, 30 Apr 2024 14:09:47 +0200 Subject: [PATCH 4/8] refactor: clone only when cache miss --- crates/iceberg/src/scan.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 3c80d16c87..242214896f 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -215,13 +215,13 @@ impl TableScan { let partition_filter = partition_filter_cache.get( partition_spec_id, &context, - partition_schema.clone(), + &partition_schema, filter, )?; let manifest_evaluator = manifest_evaluator_cache.get( partition_spec_id, - partition_filter.clone(), + partition_filter, context.case_sensitive, ); @@ -384,7 +384,7 @@ impl PartitionFilterCache { &mut self, spec_id: i32, context: &FileScanStreamContext, - partition_schema: SchemaRef, + partition_schema: &SchemaRef, filter: &BoundPredicate, ) -> Result<&BoundPredicate> { match self.0.entry(spec_id) { @@ -404,7 +404,7 @@ impl PartitionFilterCache { let partition_filter = inclusive_projection .project(filter)? .rewrite_not() - .bind(partition_schema, context.case_sensitive)?; + .bind(partition_schema.clone(), context.case_sensitive)?; Ok(e.insert(partition_filter)) } @@ -471,12 +471,13 @@ impl ManifestEvaluatorCache { fn get( &mut self, spec_id: i32, - partition_filter: BoundPredicate, + partition_filter: &BoundPredicate, case_sensitive: bool, ) -> &mut ManifestEvaluator { - self.0 - .entry(spec_id) - .or_insert(ManifestEvaluator::new(partition_filter, case_sensitive)) + self.0.entry(spec_id).or_insert(ManifestEvaluator::new( + partition_filter.clone(), + case_sensitive, + )) } } From a736496cf8b8f169fe153054323e68f95709baf7 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Tue, 30 Apr 2024 14:57:41 +0200 Subject: [PATCH 5/8] chore: move derive stmts --- crates/iceberg/src/expr/visitors/manifest_evaluator.rs | 2 +- crates/iceberg/src/scan.rs | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs index 5c51fade7a..e9239aede5 100644 --- a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs @@ -21,12 +21,12 @@ use crate::spec::{Datum, FieldSummary, ManifestFile}; use crate::Result; use fnv::FnvHashSet; -#[derive(Debug)] /// Evaluates a [`ManifestFile`] to see if the partition summaries /// match a provided [`BoundPredicate`]. /// /// Used by [`TableScan`] to prune the list of [`ManifestFile`]s /// in which data might be found that matches the TableScan's filter. +#[derive(Debug)] pub(crate) struct ManifestEvaluator { partition_filter: BoundPredicate, case_sensitive: bool, diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 242214896f..7ca5c41495 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -323,9 +323,9 @@ impl TableScan { } } -#[derive(Debug)] /// Holds the context necessary for file scanning operations /// in a streaming environment. +#[derive(Debug)] struct FileScanStreamContext { schema: SchemaRef, snapshot: SnapshotRef, @@ -366,9 +366,9 @@ impl FileScanStreamContext { } } -#[derive(Debug)] /// Manages the caching of [`BoundPredicate`] objects /// for [`PartitionSpec`]s based on partition spec id. +#[derive(Debug)] struct PartitionFilterCache(HashMap); impl PartitionFilterCache { @@ -412,9 +412,9 @@ impl PartitionFilterCache { } } -#[derive(Debug)] /// Manages the caching of partition [`Schema`]s /// for [`PartitionSpec`]s based on partition spec id. +#[derive(Debug)] struct PartitionSchemaCache(HashMap); impl PartitionSchemaCache { @@ -454,9 +454,9 @@ impl PartitionSchemaCache { } } -#[derive(Debug)] /// Manages the caching of [`ManifestEvaluator`] objects /// for [`PartitionSpec`]s based on partition spec id. +#[derive(Debug)] struct ManifestEvaluatorCache(HashMap); impl ManifestEvaluatorCache { From 0f6da9061a4f156a94e53da62f8b96f4431a9717 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Tue, 30 Apr 2024 20:36:34 +0200 Subject: [PATCH 6/8] refactor: remove unused case_sensitive parameter --- .../src/expr/visitors/manifest_evaluator.rs | 10 +++------- crates/iceberg/src/scan.rs | 15 ++++----------- 2 files changed, 7 insertions(+), 18 deletions(-) diff --git a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs index e9239aede5..fd2ebddba7 100644 --- a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs @@ -29,15 +29,11 @@ use fnv::FnvHashSet; #[derive(Debug)] pub(crate) struct ManifestEvaluator { partition_filter: BoundPredicate, - case_sensitive: bool, } impl ManifestEvaluator { - pub(crate) fn new(partition_filter: BoundPredicate, case_sensitive: bool) -> Self { - Self { - partition_filter, - case_sensitive, - } + pub(crate) fn new(partition_filter: BoundPredicate) -> Self { + Self { partition_filter } } /// Evaluate this `ManifestEvaluator`'s filter predicate against the @@ -356,7 +352,7 @@ mod test { case_sensitive, )?; - Ok(ManifestEvaluator::new(partition_filter, case_sensitive)) + Ok(ManifestEvaluator::new(partition_filter)) } #[test] diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 7ca5c41495..0b8aeeb35a 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -222,7 +222,6 @@ impl TableScan { let manifest_evaluator = manifest_evaluator_cache.get( partition_spec_id, partition_filter, - context.case_sensitive, ); if !manifest_evaluator.eval(entry)? { @@ -468,16 +467,10 @@ impl ManifestEvaluatorCache { /// Retrieves a [`ManifestEvaluator`] from the cache /// or computes it if not present. - fn get( - &mut self, - spec_id: i32, - partition_filter: &BoundPredicate, - case_sensitive: bool, - ) -> &mut ManifestEvaluator { - self.0.entry(spec_id).or_insert(ManifestEvaluator::new( - partition_filter.clone(), - case_sensitive, - )) + fn get(&mut self, spec_id: i32, partition_filter: &BoundPredicate) -> &mut ManifestEvaluator { + self.0 + .entry(spec_id) + .or_insert(ManifestEvaluator::new(partition_filter.clone())) } } From 196c7a8175f76d0a4857ab918c62439ea1c98fc8 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Wed, 1 May 2024 06:16:54 +0200 Subject: [PATCH 7/8] refactor: remove partition_schema_cache --- crates/iceberg/src/scan.rs | 61 +++++++------------------------------- 1 file changed, 10 insertions(+), 51 deletions(-) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 0b8aeeb35a..27f935835d 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -189,7 +189,6 @@ impl TableScan { self.case_sensitive, )?; - let mut partition_schema_cache = PartitionSchemaCache::new(); let mut partition_filter_cache = PartitionFilterCache::new(); let mut manifest_evaluator_cache = ManifestEvaluatorCache::new(); @@ -204,18 +203,12 @@ impl TableScan { continue; } - if let Some(filter) = context.bound_filter() { - let partition_spec_id = entry.partition_spec_id; - - let partition_schema = partition_schema_cache.get( - partition_spec_id, - &context - )?; + let partition_spec_id = entry.partition_spec_id; + if let Some(filter) = context.bound_filter() { let partition_filter = partition_filter_cache.get( partition_spec_id, &context, - &partition_schema, filter, )?; @@ -383,51 +376,10 @@ impl PartitionFilterCache { &mut self, spec_id: i32, context: &FileScanStreamContext, - partition_schema: &SchemaRef, filter: &BoundPredicate, ) -> Result<&BoundPredicate> { match self.0.entry(spec_id) { Entry::Occupied(e) => Ok(e.into_mut()), - Entry::Vacant(e) => { - let partition_spec = - context - .table_metadata - .partition_spec_by_id(spec_id) - .ok_or(Error::new( - ErrorKind::Unexpected, - format!("Could not find partition spec for id {}", spec_id), - ))?; - - let mut inclusive_projection = InclusiveProjection::new(partition_spec.clone()); - - let partition_filter = inclusive_projection - .project(filter)? - .rewrite_not() - .bind(partition_schema.clone(), context.case_sensitive)?; - - Ok(e.insert(partition_filter)) - } - } - } -} - -/// Manages the caching of partition [`Schema`]s -/// for [`PartitionSpec`]s based on partition spec id. -#[derive(Debug)] -struct PartitionSchemaCache(HashMap); - -impl PartitionSchemaCache { - /// Creates a new [`PartitionSchemaCache`] - /// with an empty internal HashMap. - fn new() -> Self { - Self(HashMap::new()) - } - - /// Retrieves a partition [`SchemaRef`] from the cache - /// or computes it if not present. - fn get(&mut self, spec_id: i32, context: &FileScanStreamContext) -> Result { - match self.0.entry(spec_id) { - Entry::Occupied(e) => Ok(e.get().clone()), Entry::Vacant(e) => { let partition_spec = context @@ -447,7 +399,14 @@ impl PartitionSchemaCache { .build()?, ); - Ok(e.insert(partition_schema).clone()) + let mut inclusive_projection = InclusiveProjection::new(partition_spec.clone()); + + let partition_filter = inclusive_projection + .project(filter)? + .rewrite_not() + .bind(partition_schema.clone(), context.case_sensitive)?; + + Ok(e.insert(partition_filter)) } } } From b9b9c009ff5d32f97dc33b8ab26d8b0b9809e77b Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Wed, 1 May 2024 06:28:09 +0200 Subject: [PATCH 8/8] refactor: move partition_filter into wider scope --- crates/iceberg/src/scan.rs | 58 ++++++++++++++++++-------------------- 1 file changed, 28 insertions(+), 30 deletions(-) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 27f935835d..c2a5e1b2d4 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -205,13 +205,12 @@ impl TableScan { let partition_spec_id = entry.partition_spec_id; - if let Some(filter) = context.bound_filter() { - let partition_filter = partition_filter_cache.get( - partition_spec_id, - &context, - filter, - )?; + let partition_filter = partition_filter_cache.get( + partition_spec_id, + &context, + )?; + if let Some(partition_filter) = partition_filter { let manifest_evaluator = manifest_evaluator_cache.get( partition_spec_id, partition_filter, @@ -220,8 +219,6 @@ impl TableScan { if !manifest_evaluator.eval(entry)? { continue; } - - // TODO: Create ExpressionEvaluator } let manifest = entry.load_manifest(&context.file_io).await?; @@ -376,13 +373,13 @@ impl PartitionFilterCache { &mut self, spec_id: i32, context: &FileScanStreamContext, - filter: &BoundPredicate, - ) -> Result<&BoundPredicate> { - match self.0.entry(spec_id) { - Entry::Occupied(e) => Ok(e.into_mut()), - Entry::Vacant(e) => { - let partition_spec = - context + ) -> Result> { + match context.bound_filter() { + None => Ok(None), + Some(filter) => match self.0.entry(spec_id) { + Entry::Occupied(e) => Ok(Some(e.into_mut())), + Entry::Vacant(e) => { + let partition_spec = context .table_metadata .partition_spec_by_id(spec_id) .ok_or(Error::new( @@ -390,24 +387,25 @@ impl PartitionFilterCache { format!("Could not find partition spec for id {}", spec_id), ))?; - let partition_type = partition_spec.partition_type(context.schema.as_ref())?; - let partition_fields = partition_type.fields().to_owned(); - let partition_schema = Arc::new( - Schema::builder() - .with_schema_id(partition_spec.spec_id) - .with_fields(partition_fields) - .build()?, - ); + let partition_type = partition_spec.partition_type(context.schema.as_ref())?; + let partition_fields = partition_type.fields().to_owned(); + let partition_schema = Arc::new( + Schema::builder() + .with_schema_id(partition_spec.spec_id) + .with_fields(partition_fields) + .build()?, + ); - let mut inclusive_projection = InclusiveProjection::new(partition_spec.clone()); + let mut inclusive_projection = InclusiveProjection::new(partition_spec.clone()); - let partition_filter = inclusive_projection - .project(filter)? - .rewrite_not() - .bind(partition_schema.clone(), context.case_sensitive)?; + let partition_filter = inclusive_projection + .project(filter)? + .rewrite_not() + .bind(partition_schema.clone(), context.case_sensitive)?; - Ok(e.insert(partition_filter)) - } + Ok(Some(e.insert(partition_filter))) + } + }, } } }