Skip to content

Commit 8e710f2

Browse files
fix: update files in distributed mode to use hash (#761)
This PR ensures all metadata and data files (json and parquet) use a simple sha256 based hash name mechanism. Each ingestor allocates itself a unique hash which is used in all file names relevant to that ingestor. This hash is persisted in metadata file content also and is supposed to be the same for the lifecycle of the ingestor. --------- Co-authored-by: Nikhil Sinha <[email protected]>
1 parent fb8a105 commit 8e710f2

24 files changed

+378
-171
lines changed

Cargo.lock

Lines changed: 5 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ hashlru = { version = "0.11.0", features = ["serde"] }
105105
path-clean = "1.0.1"
106106
prost = "0.12.3"
107107
prometheus-parse = "0.2.5"
108+
sha2 = "0.10.8"
108109

109110
[build-dependencies]
110111
cargo_toml = "0.15"

server/src/analytics.rs

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ pub struct Report {
7474
}
7575

7676
impl Report {
77-
pub async fn new() -> Self {
77+
pub async fn new() -> anyhow::Result<Self> {
7878
let mut upt: f64 = 0.0;
7979
if let Ok(uptime) = uptime_lib::get() {
8080
upt = uptime.as_secs_f64();
@@ -91,9 +91,9 @@ impl Report {
9191
cpu_count = info.cpus().len();
9292
mem_total = info.total_memory();
9393
}
94-
let ingestor_metrics = fetch_ingestors_metrics().await;
94+
let ingestor_metrics = fetch_ingestors_metrics().await?;
9595

96-
Self {
96+
Ok(Self {
9797
deployment_id: storage::StorageMetadata::global().deployment_id,
9898
uptime: upt,
9999
report_created_at: Utc::now(),
@@ -113,7 +113,7 @@ impl Report {
113113
total_json_bytes: ingestor_metrics.4,
114114
total_parquet_bytes: ingestor_metrics.5,
115115
metrics: build_metrics().await,
116-
}
116+
})
117117
}
118118

119119
pub async fn send(&self) {
@@ -148,7 +148,7 @@ fn total_event_stats() -> (u64, u64, u64) {
148148
(total_events, total_json_bytes, total_parquet_bytes)
149149
}
150150

151-
async fn fetch_ingestors_metrics() -> (u64, u64, usize, u64, u64, u64) {
151+
async fn fetch_ingestors_metrics() -> anyhow::Result<(u64, u64, usize, u64, u64, u64)> {
152152
let event_stats = total_event_stats();
153153
let mut node_metrics =
154154
NodeMetrics::new(total_streams(), event_stats.0, event_stats.1, event_stats.2);
@@ -181,24 +181,24 @@ async fn fetch_ingestors_metrics() -> (u64, u64, usize, u64, u64, u64) {
181181
.header(header::CONTENT_TYPE, "application/json")
182182
.send()
183183
.await
184-
.unwrap(); // should respond
184+
.expect("should respond");
185185

186-
let data = serde_json::from_slice::<NodeMetrics>(&resp.bytes().await.unwrap()).unwrap();
186+
let data = serde_json::from_slice::<NodeMetrics>(&resp.bytes().await?)?;
187187
vec.push(data);
188188
active_ingestors += 1;
189189
}
190190

191191
node_metrics.accumulate(&mut vec);
192192
}
193193

194-
(
194+
Ok((
195195
active_ingestors,
196196
offline_ingestors,
197197
node_metrics.stream_count,
198198
node_metrics.total_events_count,
199199
node_metrics.total_json_bytes,
200200
node_metrics.total_parquet_bytes,
201-
)
201+
))
202202
}
203203

204204
async fn build_metrics() -> HashMap<String, Value> {
@@ -220,14 +220,23 @@ async fn build_metrics() -> HashMap<String, Value> {
220220
metrics
221221
}
222222

223-
pub fn init_analytics_scheduler() {
223+
pub fn init_analytics_scheduler() -> anyhow::Result<()> {
224224
log::info!("Setting up schedular for anonymous user analytics");
225225

226226
let mut scheduler = AsyncScheduler::new();
227227
scheduler
228228
.every(ANALYTICS_SEND_INTERVAL_SECONDS)
229229
.run(move || async {
230-
Report::new().await.send().await;
230+
Report::new()
231+
.await
232+
.unwrap_or_else(|err| {
233+
// panicing because seperate thread
234+
// TODO: a better way to handle this
235+
log::error!("Error while sending analytics: {}", err.to_string());
236+
panic!("{}", err.to_string());
237+
})
238+
.send()
239+
.await;
231240
});
232241

233242
tokio::spawn(async move {
@@ -236,6 +245,8 @@ pub fn init_analytics_scheduler() {
236245
tokio::time::sleep(Duration::from_secs(10)).await;
237246
}
238247
});
248+
249+
Ok(())
239250
}
240251

241252
#[derive(Serialize, Deserialize, Default, Debug)]

server/src/catalog.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,16 @@
1616
*
1717
*/
1818

19-
use std::sync::Arc;
20-
21-
use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, Utc};
22-
use relative_path::RelativePathBuf;
19+
use std::{io::ErrorKind, sync::Arc};
2320

2421
use crate::{
2522
catalog::manifest::Manifest,
2623
query::PartialTimeFilter,
27-
storage::{ObjectStorage, ObjectStorageError, MANIFEST_FILE},
28-
utils::get_address,
24+
storage::{object_storage::manifest_path, ObjectStorage, ObjectStorageError},
2925
};
26+
use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, Utc};
27+
use relative_path::RelativePathBuf;
28+
use std::io::Error as IOError;
3029

3130
use self::{column::Column, snapshot::ManifestItem};
3231

@@ -117,8 +116,7 @@ pub async fn update_snapshot(
117116

118117
let mut ch = false;
119118
for m in manifests.iter() {
120-
let s = get_address();
121-
let p = format!("{}.{}.{}", s.ip(), s.port(), MANIFEST_FILE);
119+
let p = manifest_path("").to_string();
122120
if m.manifest_path.contains(&p) {
123121
ch = true;
124122
}
@@ -142,7 +140,11 @@ pub async fn update_snapshot(
142140
23 * 3600 + 59 * 60 + 59,
143141
999_999_999,
144142
)
145-
.unwrap(),
143+
.ok_or(IOError::new(
144+
ErrorKind::Other,
145+
"Failed to create upper bound for manifest",
146+
))
147+
.map_err(ObjectStorageError::IoError)?,
146148
)
147149
.and_utc();
148150

@@ -151,12 +153,11 @@ pub async fn update_snapshot(
151153
..Manifest::default()
152154
};
153155

154-
let addr = get_address();
155-
let mainfest_file_name = format!("{}.{}.{}", addr.ip(), addr.port(), MANIFEST_FILE);
156+
let mainfest_file_name = manifest_path("").to_string();
156157
let path =
157158
partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
158159
storage
159-
.put_object(&path, serde_json::to_vec(&manifest).unwrap().into())
160+
.put_object(&path, serde_json::to_vec(&manifest)?.into())
160161
.await?;
161162
let path = storage.absolute_url(&path);
162163
let new_snapshot_entriy = snapshot::ManifestItem {
@@ -185,8 +186,7 @@ pub async fn update_snapshot(
185186
..Manifest::default()
186187
};
187188

188-
let addr = get_address();
189-
let mainfest_file_name = format!("{}.{}.{}", addr.ip(), addr.port(), MANIFEST_FILE);
189+
let mainfest_file_name = manifest_path("").to_string();
190190
let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
191191
storage
192192
.put_object(&path, serde_json::to_vec(&manifest).unwrap().into())

server/src/cli.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ pub struct Cli {
8888
pub mode: Mode,
8989

9090
/// public address for the parseable server ingestor
91-
pub ingestor_url: String,
91+
pub ingestor_endpoint: String,
9292
}
9393

9494
impl Cli {
@@ -115,7 +115,7 @@ impl Cli {
115115
pub const ROW_GROUP_SIZE: &'static str = "row-group-size";
116116
pub const PARQUET_COMPRESSION_ALGO: &'static str = "compression-algo";
117117
pub const MODE: &'static str = "mode";
118-
pub const INGESTOR_URL: &'static str = "ingestor-url";
118+
pub const INGESTOR_ENDPOINT: &'static str = "ingestor-endpoint";
119119
pub const DEFAULT_USERNAME: &'static str = "admin";
120120
pub const DEFAULT_PASSWORD: &'static str = "admin";
121121

@@ -317,9 +317,9 @@ impl Cli {
317317
.help("Mode of operation"),
318318
)
319319
.arg(
320-
Arg::new(Self::INGESTOR_URL)
321-
.long(Self::INGESTOR_URL)
322-
.env("P_INGESTOR_URL")
320+
Arg::new(Self::INGESTOR_ENDPOINT)
321+
.long(Self::INGESTOR_ENDPOINT)
322+
.env("P_INGESTOR_ENDPOINT")
323323
.value_name("URL")
324324
.required(false)
325325
.help("URL to connect to this specific ingestor. Default is the address of the server.")
@@ -367,8 +367,8 @@ impl FromArgMatches for Cli {
367367
.cloned()
368368
.expect("default value for address");
369369

370-
self.ingestor_url = m
371-
.get_one::<String>(Self::INGESTOR_URL)
370+
self.ingestor_endpoint = m
371+
.get_one::<String>(Self::INGESTOR_ENDPOINT)
372372
.cloned()
373373
.unwrap_or_else(String::default);
374374

server/src/handlers/http.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ pub async fn fetch_schema(stream_name: &str) -> anyhow::Result<arrow_schema::Sch
8585
.await?
8686
.iter()
8787
// we should be able to unwrap as we know the data is valid schema
88-
.map(|byte_obj| serde_json::from_slice(byte_obj).unwrap())
88+
.map(|byte_obj| serde_json::from_slice(byte_obj).expect("data is valid json"))
8989
.collect_vec();
9090

9191
let new_schema = Schema::try_merge(res)?;
@@ -97,7 +97,7 @@ pub async fn fetch_schema(stream_name: &str) -> anyhow::Result<arrow_schema::Sch
9797
pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result<Vec<Value>> {
9898
// send the query request to the ingestor
9999
let mut res = vec![];
100-
let ima = get_ingestor_info().await.unwrap();
100+
let ima = get_ingestor_info().await?;
101101

102102
for im in ima.iter() {
103103
let uri = format!(

server/src/handlers/http/cluster/mod.rs

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ use chrono::Utc;
3737
use http::StatusCode;
3838
use itertools::Itertools;
3939
use relative_path::RelativePathBuf;
40+
use serde::de::Error;
41+
use serde_json::error::Error as SerdeError;
4042
use serde_json::Value as JsonValue;
4143
use url::Url;
4244

@@ -262,9 +264,13 @@ pub async fn get_cluster_info() -> Result<impl Responder, StreamError> {
262264
StreamError::SerdeError(err)
263265
})?
264266
.get("staging")
265-
.unwrap()
267+
.ok_or(StreamError::SerdeError(SerdeError::missing_field(
268+
"staging",
269+
)))?
266270
.as_str()
267-
.unwrap()
271+
.ok_or(StreamError::SerdeError(SerdeError::custom(
272+
"staging path not a string/ not provided",
273+
)))?
268274
.to_string();
269275

270276
(true, sp, None, status)
@@ -304,7 +310,9 @@ pub async fn get_cluster_metrics() -> Result<impl Responder, PostError> {
304310
&ingestor.domain_name,
305311
base_path_without_preceding_slash()
306312
))
307-
.unwrap();
313+
.map_err(|err| {
314+
PostError::Invalid(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err))
315+
})?;
308316

309317
let res = reqwest::Client::new()
310318
.get(uri)
@@ -362,14 +370,27 @@ pub async fn remove_ingestor(req: HttpRequest) -> Result<impl Responder, PostErr
362370
if check_liveness(&domain_name).await {
363371
return Err(PostError::Invalid(anyhow::anyhow!("Node Online")));
364372
}
365-
366-
let url = Url::parse(&domain_name).unwrap();
367-
let ingestor_meta_filename = ingestor_metadata_path(
368-
url.host_str().unwrap().to_owned(),
369-
url.port().unwrap().to_string(),
370-
)
371-
.to_string();
372373
let object_store = CONFIG.storage().get_object_store();
374+
375+
let ingestor_metadatas = object_store
376+
.get_objects(
377+
Some(&RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY)),
378+
Box::new(|file_name| file_name.starts_with("ingestor")),
379+
)
380+
.await?;
381+
382+
let ingestor_metadata = ingestor_metadatas
383+
.iter()
384+
.map(|elem| serde_json::from_slice::<IngestorMetadata>(elem).unwrap_or_default())
385+
.collect_vec();
386+
387+
let ingestor_metadata = ingestor_metadata
388+
.iter()
389+
.filter(|elem| elem.domain_name == domain_name)
390+
.collect_vec();
391+
392+
let ingestor_meta_filename =
393+
ingestor_metadata_path(Some(&ingestor_metadata[0].ingestor_id)).to_string();
373394
let msg = match object_store
374395
.try_delete_ingestor_meta(ingestor_meta_filename)
375396
.await

server/src/handlers/http/logstream.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
169169
}
170170

171171
if !body.is_empty() && static_schema_flag == "true" {
172-
let static_schema: StaticSchema = serde_json::from_slice(&body).unwrap();
172+
let static_schema: StaticSchema = serde_json::from_slice(&body)?;
173173
let parsed_schema = convert_static_schema_to_arrow_schema(static_schema);
174174
if let Ok(parsed_schema) = parsed_schema {
175175
schema = parsed_schema;
@@ -357,7 +357,7 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
357357
None
358358
};
359359

360-
let hash_map = STREAM_INFO.read().unwrap();
360+
let hash_map = STREAM_INFO.read().expect("Readable");
361361
let stream_meta = &hash_map
362362
.get(&stream_name)
363363
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;
@@ -396,7 +396,7 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
396396
stats
397397
};
398398

399-
let stats = serde_json::to_value(stats).unwrap();
399+
let stats = serde_json::to_value(stats)?;
400400

401401
Ok((web::Json(stats), StatusCode::OK))
402402
}

0 commit comments

Comments
 (0)