Skip to content

Commit 6a9fc23

Browse files
committed
refactor
1 parent 79321a4 commit 6a9fc23

File tree

3 files changed

+17
-9
lines changed

3 files changed

+17
-9
lines changed

src/catalog/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,12 @@ pub async fn remove_manifest_from_snapshot(
340340
Ok(get_first_event(storage.clone(), stream_name, Vec::new()).await?)
341341
}
342342
Mode::Query => Ok(get_first_event(storage, stream_name, dates).await?),
343-
Mode::Index => unimplemented!(),
343+
Mode::Index => Err(ObjectStorageError::UnhandledError(Box::new(
344+
std::io::Error::new(
345+
std::io::ErrorKind::Unsupported,
346+
"Can't remove manifest from within Index server",
347+
),
348+
))),
344349
}
345350
}
346351

src/enterprise/utils.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ pub async fn fetch_parquet_file_paths(
6666
) -> Result<HashMap<RelativePathBuf, Vec<File>>, ObjectStorageError> {
6767
let glob_storage = PARSEABLE.storage.get_object_store();
6868

69-
let object_store_format = glob_storage.get_object_store_format(stream).await.unwrap();
69+
let object_store_format = glob_storage.get_object_store_format(stream).await?;
7070

7171
let time_partition = object_store_format.time_partition;
7272

@@ -103,8 +103,7 @@ pub async fn fetch_parquet_file_paths(
103103
.map(|item| item.manifest_path)
104104
.collect(),
105105
)
106-
.await
107-
.unwrap();
106+
.await?;
108107

109108
let mut parquet_files: HashMap<RelativePathBuf, Vec<File>> = HashMap::new();
110109

@@ -137,24 +136,24 @@ pub async fn fetch_parquet_file_paths(
137136
async fn collect_manifest_files(
138137
storage: Arc<dyn ObjectStorage>,
139138
manifest_urls: Vec<String>,
140-
) -> Result<Vec<Manifest>, object_store::Error> {
139+
) -> Result<Vec<Manifest>, ObjectStorageError> {
141140
let mut tasks = Vec::new();
142141
manifest_urls.into_iter().for_each(|path| {
143-
let path = RelativePathBuf::from_path(PathBuf::from(path)).unwrap();
142+
let path = RelativePathBuf::from_path(PathBuf::from(path)).expect("Invalid path");
144143
let storage = Arc::clone(&storage);
145144
tasks.push(tokio::task::spawn(async move {
146-
storage.get_object(&path).await.unwrap()
145+
storage.get_object(&path).await
147146
}));
148147
});
149148

150149
let mut op = Vec::new();
151150
for task in tasks {
152-
let file = task.await.unwrap();
151+
let file = task.await??;
153152
op.push(file);
154153
}
155154

156155
Ok(op
157156
.into_iter()
158-
.map(|res| serde_json::from_slice(&res).unwrap())
157+
.map(|res| serde_json::from_slice(&res).expect("Data is invalid for Manifest"))
159158
.collect())
160159
}

src/storage/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use chrono::Local;
2020
use object_store::path::Path;
2121
use relative_path::RelativePath;
2222
use serde::{Deserialize, Serialize};
23+
use tokio::task::JoinError;
2324

2425
use crate::{
2526
catalog::snapshot::Snapshot,
@@ -254,6 +255,9 @@ pub enum ObjectStorageError {
254255

255256
#[error("{0}")]
256257
StandaloneWithDistributed(#[from] StandaloneWithDistributed),
258+
259+
#[error("JoinError: {0}")]
260+
JoinError(#[from] JoinError),
257261
}
258262

259263
pub fn to_object_store_path(path: &RelativePath) -> Path {

0 commit comments

Comments
 (0)