Skip to content

Commit c696a3f

Browse files
committed
feat: adds ObjectCache, to cache Manifests and ManifestLists
1 parent 8eef484 commit c696a3f

File tree

11 files changed

+357
-67
lines changed

11 files changed

+357
-67
lines changed

crates/catalog/glue/src/catalog.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -381,14 +381,12 @@ impl Catalog for GlueCatalog {
381381

382382
builder.send().await.map_err(from_aws_sdk_error)?;
383383

384-
let table = Table::builder()
384+
Table::builder()
385385
.file_io(self.file_io())
386386
.metadata_location(metadata_location)
387387
.metadata(metadata)
388388
.identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
389-
.build();
390-
391-
Ok(table)
389+
.build()
392390
}
393391

394392
/// Loads a table from the Glue Catalog and constructs a `Table` object
@@ -432,17 +430,15 @@ impl Catalog for GlueCatalog {
432430
let metadata_content = input_file.read().await?;
433431
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;
434432

435-
let table = Table::builder()
433+
Table::builder()
436434
.file_io(self.file_io())
437435
.metadata_location(metadata_location)
438436
.metadata(metadata)
439437
.identifier(TableIdent::new(
440438
NamespaceIdent::new(db_name),
441439
table_name.to_owned(),
442440
))
443-
.build();
444-
445-
Ok(table)
441+
.build()
446442
}
447443
}
448444
}

crates/catalog/hms/src/catalog.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -369,14 +369,12 @@ impl Catalog for HmsCatalog {
369369
.await
370370
.map_err(from_thrift_error)?;
371371

372-
let table = Table::builder()
372+
Table::builder()
373373
.file_io(self.file_io())
374374
.metadata_location(metadata_location)
375375
.metadata(metadata)
376376
.identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
377-
.build();
378-
379-
Ok(table)
377+
.build()
380378
}
381379

382380
/// Loads a table from the Hive Metastore and constructs a `Table` object
@@ -407,17 +405,15 @@ impl Catalog for HmsCatalog {
407405
let metadata_content = self.file_io.new_input(&metadata_location)?.read().await?;
408406
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;
409407

410-
let table = Table::builder()
408+
Table::builder()
411409
.file_io(self.file_io())
412410
.metadata_location(metadata_location)
413411
.metadata(metadata)
414412
.identifier(TableIdent::new(
415413
NamespaceIdent::new(db_name),
416414
table.name.clone(),
417415
))
418-
.build();
419-
420-
Ok(table)
416+
.build()
421417
}
422418

423419
/// Asynchronously drops a table from the database.

crates/catalog/memory/src/catalog.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -195,14 +195,12 @@ impl Catalog for MemoryCatalog {
195195

196196
root_namespace_state.insert_new_table(&table_ident, metadata_location.clone())?;
197197

198-
let table = Table::builder()
198+
Table::builder()
199199
.file_io(self.file_io.clone())
200200
.metadata_location(metadata_location)
201201
.metadata(metadata)
202202
.identifier(table_ident)
203-
.build();
204-
205-
Ok(table)
203+
.build()
206204
}
207205

208206
/// Load table from the catalog.
@@ -213,14 +211,13 @@ impl Catalog for MemoryCatalog {
213211
let input_file = self.file_io.new_input(metadata_location)?;
214212
let metadata_content = input_file.read().await?;
215213
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;
216-
let table = Table::builder()
214+
215+
Table::builder()
217216
.file_io(self.file_io.clone())
218217
.metadata_location(metadata_location.clone())
219218
.metadata(metadata)
220219
.identifier(table_ident.clone())
221-
.build();
222-
223-
Ok(table)
220+
.build()
224221
}
225222

226223
/// Drop a table from the catalog.

crates/catalog/rest/src/catalog.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -516,7 +516,7 @@ impl Catalog for RestCatalog {
516516
.load_file_io(resp.metadata_location.as_deref(), resp.config)
517517
.await?;
518518

519-
let table = Table::builder()
519+
Table::builder()
520520
.identifier(table_ident)
521521
.file_io(file_io)
522522
.metadata(resp.metadata)
@@ -526,9 +526,7 @@ impl Catalog for RestCatalog {
526526
"Metadata location missing in create table response!",
527527
)
528528
})?)
529-
.build();
530-
531-
Ok(table)
529+
.build()
532530
}
533531

534532
/// Load table from the catalog.
@@ -560,9 +558,9 @@ impl Catalog for RestCatalog {
560558
.metadata(resp.metadata);
561559

562560
if let Some(metadata_location) = resp.metadata_location {
563-
Ok(table_builder.metadata_location(metadata_location).build())
561+
table_builder.metadata_location(metadata_location).build()
564562
} else {
565-
Ok(table_builder.build())
563+
table_builder.build()
566564
}
567565
}
568566

@@ -661,12 +659,12 @@ impl Catalog for RestCatalog {
661659
let file_io = self
662660
.load_file_io(Some(&resp.metadata_location), None)
663661
.await?;
664-
Ok(Table::builder()
662+
Table::builder()
665663
.identifier(commit.identifier().clone())
666664
.file_io(file_io)
667665
.metadata(resp.metadata)
668666
.metadata_location(resp.metadata_location)
669-
.build())
667+
.build()
670668
}
671669
}
672670

@@ -1661,6 +1659,7 @@ mod tests {
16611659
.identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
16621660
.file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
16631661
.build()
1662+
.unwrap()
16641663
};
16651664

16661665
let table = Transaction::new(&table1)
@@ -1785,6 +1784,7 @@ mod tests {
17851784
.identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
17861785
.file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
17871786
.build()
1787+
.unwrap()
17881788
};
17891789

17901790
let table_result = Transaction::new(&table1)

crates/iceberg/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ derive_builder = { workspace = true }
6060
fnv = { workspace = true }
6161
futures = { workspace = true }
6262
itertools = { workspace = true }
63+
moka = { version = "0.12.8", features = ["future"] }
6364
murmur3 = { workspace = true }
6465
num_cpus = { workspace = true }
6566
once_cell = { workspace = true }

crates/iceberg/src/io/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,9 @@ use storage_memory::*;
7878
mod storage_s3;
7979
#[cfg(feature = "storage-s3")]
8080
pub use storage_s3::*;
81+
pub(crate) mod object_cache;
8182
#[cfg(feature = "storage-fs")]
8283
mod storage_fs;
84+
8385
#[cfg(feature = "storage-fs")]
8486
use storage_fs::*;
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::Arc;
19+
20+
use crate::io::FileIO;
21+
use crate::spec::{
22+
FormatVersion, Manifest, ManifestFile, ManifestList, SchemaId, SnapshotRef, TableMetadataRef,
23+
};
24+
use crate::{Error, ErrorKind, Result};
25+
26+
const DEFAULT_CACHE_SIZE_BYTES: u64 = 2 ^ 15; // 32MB
27+
28+
#[derive(Clone, Debug)]
29+
pub(crate) enum CachedItem {
30+
ManifestList(Arc<ManifestList>),
31+
Manifest(Arc<Manifest>),
32+
}
33+
34+
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
35+
pub(crate) enum CachedObjectKey {
36+
ManifestList((String, FormatVersion, SchemaId)),
37+
Manifest(String),
38+
}
39+
40+
/// Caches metadata objects deserialized from immutable files
41+
#[derive(Clone, Debug)]
42+
pub struct ObjectCache {
43+
cache: moka::future::Cache<CachedObjectKey, CachedItem>,
44+
file_io: FileIO,
45+
cache_disabled: bool,
46+
}
47+
48+
impl ObjectCache {
49+
/// Creates a new [`ObjectCache`]
50+
/// with the default cache size
51+
pub(crate) fn new(file_io: FileIO) -> Self {
52+
Self::new_with_cache_size(file_io, DEFAULT_CACHE_SIZE_BYTES)
53+
}
54+
55+
/// Creates a new [`ObjectCache`]
56+
/// with a specific cache size
57+
pub(crate) fn new_with_cache_size(file_io: FileIO, cache_size_bytes: u64) -> Self {
58+
if cache_size_bytes == 0 {
59+
Self::with_disabled_cache(file_io)
60+
} else {
61+
Self {
62+
cache: moka::future::Cache::new(cache_size_bytes),
63+
file_io,
64+
cache_disabled: false,
65+
}
66+
}
67+
}
68+
69+
/// Creates a new [`ObjectCache`]
70+
/// with caching disabled
71+
pub(crate) fn with_disabled_cache(file_io: FileIO) -> Self {
72+
Self {
73+
cache: moka::future::Cache::new(0),
74+
file_io,
75+
cache_disabled: true,
76+
}
77+
}
78+
79+
/// Retrieves an Arc [`Manifest`] from the cache
80+
/// or retrieves one from FileIO and parses it if not present
81+
pub(crate) async fn get_manifest(&self, manifest_file: &ManifestFile) -> Result<Arc<Manifest>> {
82+
if self.cache_disabled {
83+
return manifest_file
84+
.load_manifest(&self.file_io)
85+
.await
86+
.map(Arc::new);
87+
}
88+
89+
let key = CachedObjectKey::Manifest(manifest_file.manifest_path.clone());
90+
91+
let cache_entry = self
92+
.cache
93+
.entry_by_ref(&key)
94+
.or_try_insert_with(self.fetch_and_parse_manifest(manifest_file))
95+
.await
96+
.map_err(|err| Error::new(ErrorKind::Unexpected, err.as_ref().message()))?
97+
.into_value();
98+
99+
match cache_entry {
100+
CachedItem::Manifest(arc_manifest) => Ok(arc_manifest),
101+
_ => Err(Error::new(
102+
ErrorKind::Unexpected,
103+
format!("cached object for key '{:?}' is not a Manifest", key),
104+
)),
105+
}
106+
}
107+
108+
/// Retrieves an Arc [`ManifestList`] from the cache
109+
/// or retrieves one from FileIO and parses it if not present
110+
pub(crate) async fn get_manifest_list(
111+
&self,
112+
snapshot: &SnapshotRef,
113+
table_metadata: &TableMetadataRef,
114+
) -> Result<Arc<ManifestList>> {
115+
if self.cache_disabled {
116+
return snapshot
117+
.load_manifest_list(&self.file_io, table_metadata)
118+
.await
119+
.map(Arc::new);
120+
}
121+
122+
let key = CachedObjectKey::ManifestList((
123+
snapshot.manifest_list().to_string(),
124+
table_metadata.format_version,
125+
snapshot.schema_id().unwrap(),
126+
));
127+
let cache_entry = self
128+
.cache
129+
.entry_by_ref(&key)
130+
.or_try_insert_with(self.fetch_and_parse_manifest_list(snapshot, table_metadata))
131+
.await
132+
.map_err(|err| Error::new(ErrorKind::Unexpected, err.as_ref().message()))?
133+
.into_value();
134+
135+
match cache_entry {
136+
CachedItem::ManifestList(arc_manifest_list) => Ok(arc_manifest_list),
137+
_ => Err(Error::new(
138+
ErrorKind::Unexpected,
139+
format!("cached object for path '{:?}' is not a Manifest", key),
140+
)),
141+
}
142+
}
143+
144+
async fn fetch_and_parse_manifest(&self, manifest_file: &ManifestFile) -> Result<CachedItem> {
145+
let manifest = manifest_file.load_manifest(&self.file_io).await?;
146+
147+
Ok(CachedItem::Manifest(Arc::new(manifest)))
148+
}
149+
150+
async fn fetch_and_parse_manifest_list(
151+
&self,
152+
snapshot: &SnapshotRef,
153+
table_metadata: &TableMetadataRef,
154+
) -> Result<CachedItem> {
155+
let manifest_list = snapshot
156+
.load_manifest_list(&self.file_io, table_metadata)
157+
.await?;
158+
159+
Ok(CachedItem::ManifestList(Arc::new(manifest_list)))
160+
}
161+
}

0 commit comments

Comments
 (0)