Skip to content

Commit e586e13

Browse files
committed
fix: s3 get objects
Refactor object storage to filter objects by starts_with_pattern
1 parent ad5062e commit e586e13

File tree

5 files changed

+8
-4
lines changed

5 files changed

+8
-4
lines changed

server/src/handlers/http/cluster/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ pub async fn fetch_stats_from_ingesters(
117117
let obs = CONFIG
118118
.storage()
119119
.get_object_store()
120-
.get_objects(Some(&path))
120+
.get_objects(Some(&path), ".ingester")
121121
.await?;
122122
let mut ingestion_size = 0u64;
123123
let mut storage_size = 0u64;
@@ -346,7 +346,7 @@ pub async fn get_ingester_info() -> anyhow::Result<IngesterMetadataArr> {
346346

347347
let root_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY);
348348
let arr = store
349-
.get_objects(Some(&root_path))
349+
.get_objects(Some(&root_path), "ingester")
350350
.await?
351351
.iter()
352352
// this unwrap will most definateley shoot me in the foot later

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ impl IngestServer {
251251
let store = CONFIG.storage().get_object_store();
252252
let base_path = RelativePathBuf::from("");
253253
let ingester_metadata = store
254-
.get_objects(Some(&base_path))
254+
.get_objects(Some(&base_path), "ingester")
255255
.await?
256256
.iter()
257257
// this unwrap will most definateley shoot me in the foot later

server/src/storage/localfs.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,9 +189,11 @@ impl ObjectStorage for LocalFS {
189189
Ok(path_arr)
190190
}
191191

192+
/// currently it is not using the starts_with_pattern
192193
async fn get_objects(
193194
&self,
194195
base_path: Option<&RelativePath>,
196+
_starts_with_pattern: &str
195197
) -> Result<Vec<Bytes>, ObjectStorageError> {
196198
let time = Instant::now();
197199

server/src/storage/object_storage.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ pub trait ObjectStorage: Sync + 'static {
6969
async fn get_objects(
7070
&self,
7171
base_path: Option<&RelativePath>,
72+
starts_with_pattern: &str
7273
) -> Result<Vec<Bytes>, ObjectStorageError>;
7374
async fn put_object(
7475
&self,

server/src/storage/s3.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,7 @@ impl ObjectStorage for S3 {
416416
async fn get_objects(
417417
&self,
418418
base_path: Option<&RelativePath>,
419+
starts_with_pattern: &str,
419420
) -> Result<Vec<Bytes>, ObjectStorageError> {
420421
let instant = Instant::now();
421422

@@ -430,7 +431,7 @@ impl ObjectStorage for S3 {
430431
let mut res = vec![];
431432

432433
while let Some(meta) = list_stream.next().await.transpose()? {
433-
let ingester_file = meta.location.filename().unwrap().starts_with("ingester");
434+
let ingester_file = meta.location.filename().unwrap().starts_with(starts_with_pattern);
434435

435436
if !ingester_file {
436437
continue;

0 commit comments

Comments
 (0)