Skip to content

Commit 8c05958

Browse files
committed
fix: use hash for distributed mode
1 parent dd5bae6 commit 8c05958

File tree

8 files changed

+45
-58
lines changed

8 files changed

+45
-58
lines changed

server/src/catalog.rs

Lines changed: 14 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,16 @@
1616
*
1717
*/
1818

19-
use std::sync::Arc;
20-
21-
use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, Utc};
22-
use relative_path::RelativePathBuf;
19+
use std::{io::ErrorKind, sync::Arc};
2320

2421
use crate::{
2522
catalog::manifest::Manifest,
2623
query::PartialTimeFilter,
27-
storage::{ObjectStorage, ObjectStorageError, MANIFEST_FILE},
28-
utils::get_url,
24+
storage::{object_storage::manifest_path, ObjectStorage, ObjectStorageError},
2925
};
26+
use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, Utc};
27+
use relative_path::RelativePathBuf;
28+
use std::io::Error as IOError;
3029

3130
use self::{column::Column, snapshot::ManifestItem};
3231

@@ -117,13 +116,7 @@ pub async fn update_snapshot(
117116

118117
let mut ch = false;
119118
for m in manifests.iter() {
120-
let s = get_url();
121-
let p = format!(
122-
"{}.{}.{}",
123-
s.domain().unwrap(),
124-
s.port().unwrap_or_default(),
125-
MANIFEST_FILE
126-
);
119+
let p = manifest_path("").to_string();
127120
if m.manifest_path.contains(&p) {
128121
ch = true;
129122
}
@@ -147,7 +140,11 @@ pub async fn update_snapshot(
147140
23 * 3600 + 59 * 60 + 59,
148141
999_999_999,
149142
)
150-
.unwrap(),
143+
.ok_or(IOError::new(
144+
ErrorKind::Other,
145+
"Failed to create upper bound for manifest",
146+
))
147+
.map_err(ObjectStorageError::IoError)?,
151148
)
152149
.and_utc();
153150

@@ -156,17 +153,11 @@ pub async fn update_snapshot(
156153
..Manifest::default()
157154
};
158155

159-
let addr = get_url();
160-
let mainfest_file_name = format!(
161-
"{}.{}.{}",
162-
addr.domain().unwrap(),
163-
addr.port().unwrap_or_default(),
164-
MANIFEST_FILE
165-
);
156+
let mainfest_file_name = manifest_path("").to_string();
166157
let path =
167158
partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
168159
storage
169-
.put_object(&path, serde_json::to_vec(&manifest).unwrap().into())
160+
.put_object(&path, serde_json::to_vec(&manifest)?.into())
170161
.await?;
171162
let path = storage.absolute_url(&path);
172163
let new_snapshot_entriy = snapshot::ManifestItem {
@@ -195,13 +186,7 @@ pub async fn update_snapshot(
195186
..Manifest::default()
196187
};
197188

198-
let addr = get_url();
199-
let mainfest_file_name = format!(
200-
"{}.{}.{}",
201-
addr.domain().unwrap(),
202-
addr.port().unwrap(),
203-
MANIFEST_FILE
204-
);
189+
let mainfest_file_name = manifest_path("").to_string();
205190
let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
206191
storage
207192
.put_object(&path, serde_json::to_vec(&manifest).unwrap().into())

server/src/migration.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ async fn run_meta_file_migration(
244244
// we can unwrap here because we know the file exists
245245
let new_path = RelativePathBuf::from_iter([
246246
PARSEABLE_ROOT_DIRECTORY,
247-
file.file_name().unwrap(),
247+
file.file_name().expect("should have a file name"),
248248
]);
249249
object_store.put_object(&new_path, bytes).await?;
250250
object_store.delete_object(&file).await?;
@@ -281,7 +281,7 @@ async fn run_stream_files_migration(
281281
let new_path = RelativePathBuf::from_iter([
282282
stream.as_str(),
283283
STREAM_ROOT_DIRECTORY,
284-
path.file_name().unwrap(),
284+
path.file_name().expect("should have a file name"),
285285
]);
286286
object_store.put_object(&new_path, bytes).await?;
287287
object_store.delete_object(&path).await?;

server/src/option.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ impl Mode {
229229
"Query" => Ok(Mode::Query),
230230
"Ingest" => Ok(Mode::Ingest),
231231
"All" => Ok(Mode::All),
232-
x => Err(format!("Invalid mode: {}", x)),
232+
x => Err(format!("Trying to Parse Invalid mode: {}", x)),
233233
}
234234
}
235235
}

server/src/static_schema.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ pub struct StaticSchema {
1212
}
1313

1414
#[derive(Serialize, Deserialize, Debug)]
15-
1615
pub struct SchemaFields {
1716
name: String,
1817
data_type: String,

server/src/storage/object_storage.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use super::{
2727

2828
use crate::handlers::http::modal::ingest_server::INGESTOR_META;
2929
use crate::option::Mode;
30-
use crate::utils::get_url;
30+
3131
use crate::{
3232
alerts::Alerts,
3333
catalog::{self, manifest::Manifest, snapshot::Snapshot},
@@ -585,15 +585,17 @@ fn alert_json_path(stream_name: &str) -> RelativePathBuf {
585585
}
586586

587587
#[inline(always)]
588-
fn manifest_path(prefix: &str) -> RelativePathBuf {
589-
let addr = get_url();
590-
let mainfest_file_name = format!(
591-
"{}.{}.{}",
592-
addr.domain().unwrap(),
593-
addr.port().unwrap_or_default(),
594-
MANIFEST_FILE
595-
);
596-
RelativePathBuf::from_iter([prefix, &mainfest_file_name])
588+
pub fn manifest_path(prefix: &str) -> RelativePathBuf {
589+
if CONFIG.parseable.mode == Mode::Ingest {
590+
let manifest_file_name = format!(
591+
"ingestor.{}.{}",
592+
INGESTOR_META.get_ingestor_id(),
593+
MANIFEST_FILE
594+
);
595+
RelativePathBuf::from_iter([prefix, &manifest_file_name])
596+
} else {
597+
RelativePathBuf::from_iter([MANIFEST_FILE])
598+
}
597599
}
598600

599601
#[inline(always)]

server/src/storage/staging.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ use crate::{
3131
metrics,
3232
option::{Mode, CONFIG},
3333
storage::OBJECT_STORE_DATA_GRANULARITY,
34-
utils::{self, arrow::merged_reader::MergedReverseRecordReader, get_ingestor_id, get_url},
34+
utils::{
35+
self, arrow::merged_reader::MergedReverseRecordReader, get_ingestor_id, get_url,
36+
hostname_unchecked,
37+
},
3538
};
3639
use arrow_schema::{ArrowError, Schema};
3740
use chrono::{NaiveDateTime, Timelike, Utc};
@@ -64,11 +67,12 @@ impl StorageDir {
6467
+ &utils::hour_to_prefix(time.hour())
6568
+ &utils::minute_to_prefix(time.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap();
6669
let local_uri = str::replace(&uri, "/", ".");
70+
let hostname = hostname_unchecked();
6771
if CONFIG.parseable.mode == Mode::Ingest {
6872
let id = INGESTOR_META.get_ingestor_id();
69-
format!("{local_uri}{id}.{extention}")
73+
format!("{local_uri}{hostname}{id}.{extention}")
7074
} else {
71-
format!("{local_uri}.{extention}")
75+
format!("{local_uri}{hostname}.{extention}")
7276
}
7377
}
7478

@@ -163,7 +167,7 @@ impl StorageDir {
163167
fn arrow_path_to_parquet(path: &Path) -> PathBuf {
164168
let filename = path.file_name().unwrap().to_str().unwrap();
165169
let (_, filename) = filename.split_once('.').unwrap();
166-
let filename = filename.rsplit_once('.').unwrap();
170+
let filename = filename.rsplit_once('.').expect("contains the delim `.`");
167171
let filename = format!("{}.{}", filename.0, filename.1);
168172

169173
/*
@@ -316,7 +320,7 @@ pub fn get_ingestor_info() -> anyhow::Result<IngestorMetadata> {
316320
let store = CONFIG.storage().get_object_store();
317321
let url = get_url();
318322
let out = IngestorMetadata::new(
319-
url.port().unwrap().to_string(), // here port should be defined
323+
url.port().expect("here port should be defined").to_string(),
320324
url.to_string(),
321325
DEFAULT_VERSION.to_string(),
322326
store.get_bucket_name(),

server/src/sync.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use std::time::Duration;
2828
use crate::option::CONFIG;
2929
use crate::{storage, STORAGE_UPLOAD_INTERVAL};
3030

31-
pub(crate) fn object_store_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) {
31+
pub fn object_store_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) {
3232
let (outbox_tx, outbox_rx) = oneshot::channel::<()>();
3333
let (inbox_tx, inbox_rx) = oneshot::channel::<()>();
3434
let mut inbox_rx = AssertUnwindSafe(inbox_rx);
@@ -70,7 +70,7 @@ pub(crate) fn object_store_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, one
7070
(handle, outbox_rx, inbox_tx)
7171
}
7272

73-
pub(crate) fn run_local_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) {
73+
pub fn run_local_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) {
7474
let (outbox_tx, outbox_rx) = oneshot::channel::<()>();
7575
let (inbox_tx, inbox_rx) = oneshot::channel::<()>();
7676
let mut inbox_rx = AssertUnwindSafe(inbox_rx);

server/src/utils.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,11 @@ pub fn hostname() -> Option<String> {
3636
.ok()
3737
.and_then(|hostname| hostname.into_string().ok())
3838
}
39-
#[allow(dead_code)]
39+
4040
pub fn hostname_unchecked() -> String {
4141
hostname::get().unwrap().into_string().unwrap()
4242
}
4343

44-
#[allow(dead_code)]
45-
pub fn capitalize_ascii(s: &str) -> String {
46-
s[0..1].to_uppercase() + &s[1..]
47-
}
48-
4944
/// Convert minutes to a slot range
5045
/// e.g. given minute = 15 and OBJECT_STORE_DATA_GRANULARITY = 10 returns "10-19"
5146
pub fn minute_to_slot(minute: u32, data_granularity: u32) -> Option<String> {
@@ -235,7 +230,7 @@ pub fn get_url() -> Url {
235230
CONFIG.parseable.address
236231
)
237232
.parse::<Url>() // if the value was improperly set, this will panic before hand
238-
.unwrap();
233+
.expect("Valid URL");
239234
}
240235
let addr_from_env = CONFIG
241236
.parseable
@@ -260,7 +255,9 @@ pub fn get_url() -> Url {
260255
let var_port = port[1..].to_string();
261256
port = get_from_env(&var_port);
262257
}
263-
format!("{}:{}", hostname, port).parse::<Url>().unwrap()
258+
format!("{}:{}", hostname, port)
259+
.parse::<Url>()
260+
.expect("Valid URL")
264261
}
265262

266263
/// util fuction to fetch value from an env var

0 commit comments

Comments
 (0)