Skip to content

Commit 50caaf9

Browse files
committed
update files in distributed mode to use hash
1 parent acd8afb commit 50caaf9

File tree

12 files changed

+152
-80
lines changed

12 files changed

+152
-80
lines changed

Cargo.lock

Lines changed: 5 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ hashlru = { version = "0.11.0", features = ["serde"] }
105105
path-clean = "1.0.1"
106106
prost = "0.12.3"
107107
prometheus-parse = "0.2.5"
108+
sha2 = "0.10.8"
108109

109110
[build-dependencies]
110111
cargo_toml = "0.15"

server/src/catalog.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use crate::{
2525
catalog::manifest::Manifest,
2626
query::PartialTimeFilter,
2727
storage::{ObjectStorage, ObjectStorageError, MANIFEST_FILE},
28-
utils::get_address,
28+
utils::get_url,
2929
};
3030

3131
use self::{column::Column, snapshot::ManifestItem};
@@ -117,7 +117,7 @@ pub async fn update_snapshot(
117117

118118
let mut ch = false;
119119
for m in manifests.iter() {
120-
let s = get_address();
120+
let s = get_url();
121121
let p = format!(
122122
"{}.{}.{}",
123123
s.domain().unwrap(),
@@ -156,7 +156,7 @@ pub async fn update_snapshot(
156156
..Manifest::default()
157157
};
158158

159-
let addr = get_address();
159+
let addr = get_url();
160160
let mainfest_file_name = format!(
161161
"{}.{}.{}",
162162
addr.domain().unwrap(),
@@ -195,7 +195,7 @@ pub async fn update_snapshot(
195195
..Manifest::default()
196196
};
197197

198-
let addr = get_address();
198+
let addr = get_url();
199199
let mainfest_file_name = format!(
200200
"{}.{}.{}",
201201
addr.domain().unwrap(),

server/src/handlers/http/cluster/mod.rs

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -372,14 +372,27 @@ pub async fn remove_ingestor(req: HttpRequest) -> Result<impl Responder, PostErr
372372
if check_liveness(&domain_name).await {
373373
return Err(PostError::Invalid(anyhow::anyhow!("Node Online")));
374374
}
375-
376-
let url = Url::parse(&domain_name).unwrap();
377-
let ingestor_meta_filename = ingestor_metadata_path(
378-
url.host_str().unwrap().to_owned(),
379-
url.port().unwrap().to_string(),
380-
)
381-
.to_string();
382375
let object_store = CONFIG.storage().get_object_store();
376+
377+
let ingestor_metadatas = object_store
378+
.get_objects(
379+
Some(&RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY)),
380+
Box::new(|file_name| file_name.starts_with("ingestor")),
381+
)
382+
.await?;
383+
384+
let ingestor_metadata = ingestor_metadatas
385+
.iter()
386+
.map(|elem| serde_json::from_slice::<IngestorMetadata>(elem).unwrap_or_default())
387+
.collect_vec();
388+
389+
let ingestor_metadata = ingestor_metadata
390+
.iter()
391+
.filter(|elem| elem.domain_name == domain_name)
392+
.collect_vec();
393+
394+
let ingestor_meta_filename =
395+
ingestor_metadata_path(Some(&ingestor_metadata[0].ingestor_id)).to_string();
383396
let msg = match object_store
384397
.try_delete_ingestor_meta(ingestor_meta_filename)
385398
.await

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 14 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use crate::rbac::role::Action;
2828
use crate::storage;
2929
use crate::storage::object_storage::ingestor_metadata_path;
3030
use crate::storage::object_storage::parseable_json_path;
31+
use crate::storage::staging;
3132
use crate::storage::ObjectStorageError;
3233
use crate::sync;
3334

@@ -36,24 +37,25 @@ use super::ssl_acceptor::get_ssl_acceptor;
3637
use super::IngestorMetadata;
3738
use super::OpenIdClient;
3839
use super::ParseableServer;
39-
use super::DEFAULT_VERSION;
4040

41-
use crate::utils::get_address;
4241
use actix_web::body::MessageBody;
4342
use actix_web::Scope;
4443
use actix_web::{web, App, HttpServer};
4544
use actix_web_prometheus::PrometheusMetrics;
4645
use async_trait::async_trait;
4746
use base64::Engine;
4847
use itertools::Itertools;
48+
use once_cell::sync::Lazy;
4949
use relative_path::RelativePathBuf;
50-
use url::Url;
5150

5251
use crate::{
5352
handlers::http::{base_path, cross_origin_config},
5453
option::CONFIG,
5554
};
5655

56+
pub static INGESTOR_META: Lazy<IngestorMetadata> =
57+
Lazy::new(|| staging::get_ingestor_info().expect("dir is readable and writeable"));
58+
5759
#[derive(Default)]
5860
pub struct IngestServer;
5961

@@ -102,6 +104,7 @@ impl ParseableServer for IngestServer {
102104
/// implement the init method will just invoke the initialize method
103105
async fn init(&self) -> anyhow::Result<()> {
104106
self.validate()?;
107+
105108
// check for querier state. Is it there, or was it there in the past
106109
self.check_querier_state().await?;
107110
// to get the .parseable.json file in staging
@@ -181,46 +184,23 @@ impl IngestServer {
181184
async fn set_ingestor_metadata(&self) -> anyhow::Result<()> {
182185
let store = CONFIG.storage().get_object_store();
183186

184-
let sock = get_address();
185-
let path = ingestor_metadata_path(
186-
sock.domain().unwrap().to_string(),
187-
sock.port().unwrap_or_default().to_string(),
188-
);
187+
// find the meta file in staging if not generate new metadata
188+
let resource = INGESTOR_META.clone();
189+
// use the id that was generated/found in the staging and
190+
// generate the path for the object store
191+
let path = ingestor_metadata_path(None);
189192

190193
if store.get_object(&path).await.is_ok() {
191-
println!("ingestor metadata already exists");
194+
log::info!("ingestor metadata already exists");
192195
return Ok(());
193196
};
194197

195-
let scheme = CONFIG.parseable.get_scheme();
196-
let resource = IngestorMetadata::new(
197-
sock.port().unwrap_or_default().to_string(),
198-
CONFIG
199-
.parseable
200-
.domain_address
201-
.clone()
202-
.unwrap_or_else(|| {
203-
Url::parse(&format!(
204-
"{}://{}:{}",
205-
scheme,
206-
sock.domain().unwrap(),
207-
sock.port().unwrap_or_default()
208-
))
209-
.unwrap()
210-
})
211-
.to_string(),
212-
DEFAULT_VERSION.to_string(),
213-
store.get_bucket_name(),
214-
&CONFIG.parseable.username,
215-
&CONFIG.parseable.password,
216-
);
217-
218198
let resource = serde_json::to_string(&resource)
219199
.unwrap()
220200
.try_into_bytes()
221201
.unwrap();
222202

223-
store.put_object(&path, resource).await?;
203+
store.put_object(&path, resource.clone()).await?;
224204

225205
Ok(())
226206
}
@@ -279,6 +259,7 @@ impl IngestServer {
279259
}
280260

281261
async fn initialize(&self) -> anyhow::Result<()> {
262+
// ! Undefined and Untested behaviour
282263
if let Some(cache_manager) = LocalCacheManager::global() {
283264
cache_manager
284265
.validate(CONFIG.parseable.local_cache_size)

server/src/handlers/http/modal/mod.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ pub struct IngestorMetadata {
6161
pub domain_name: String,
6262
pub bucket_name: String,
6363
pub token: String,
64+
pub ingestor_id: String,
6465
}
6566

6667
impl IngestorMetadata {
@@ -71,6 +72,7 @@ impl IngestorMetadata {
7172
bucket_name: String,
7273
username: &str,
7374
password: &str,
75+
ingestor_id: String,
7476
) -> Self {
7577
let token = base64::prelude::BASE64_STANDARD.encode(format!("{}:{}", username, password));
7678

@@ -82,8 +84,13 @@ impl IngestorMetadata {
8284
version,
8385
bucket_name,
8486
token,
87+
ingestor_id,
8588
}
8689
}
90+
91+
pub fn get_ingestor_id(&self) -> String {
92+
self.ingestor_id.clone()
93+
}
8794
}
8895

8996
#[cfg(test)]
@@ -102,9 +109,10 @@ mod test {
102109
"somebucket".to_string(),
103110
"admin",
104111
"admin",
112+
"ingestor_id".to_string(),
105113
);
106114

107-
let rhs = serde_json::from_slice::<IngestorMetadata>(br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4="}"#).unwrap();
115+
let rhs = serde_json::from_slice::<IngestorMetadata>(br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=", "ingestor_id": "ingestor_id"}"#).unwrap();
108116

109117
assert_eq!(rhs, lhs);
110118
}
@@ -118,13 +126,14 @@ mod test {
118126
"somebucket".to_string(),
119127
"admin",
120128
"admin",
129+
"ingestor_id".to_string(),
121130
);
122131

123132
let lhs = serde_json::to_string(&im)
124133
.unwrap()
125134
.try_into_bytes()
126135
.unwrap();
127-
let rhs = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4="}"#
136+
let rhs = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=","ingestor_id":"ingestor_id"}"#
128137
.try_into_bytes()
129138
.unwrap();
130139

server/src/handlers/http/modal/query_server.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,12 +125,12 @@ impl QueryServer {
125125
.service(Server::get_llm_webscope())
126126
.service(Server::get_oauth_webscope(oidc_client))
127127
.service(Server::get_user_role_webscope())
128-
.service(Self::get_cluster_info_web_scope()),
128+
.service(Self::get_cluster_web_scope()),
129129
)
130130
.service(Server::get_generated());
131131
}
132132

133-
fn get_cluster_info_web_scope() -> actix_web::Scope {
133+
fn get_cluster_web_scope() -> actix_web::Scope {
134134
web::scope("/cluster")
135135
.service(
136136
// GET "/cluster/info" ==> Get info of the cluster

server/src/metrics/prom_utils.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::utils::get_address;
1+
use crate::utils::get_url;
22
use prometheus_parse::Sample as PromSample;
33
use prometheus_parse::Value as PromValue;
44
use serde::Serialize;
@@ -22,7 +22,7 @@ struct StorageMetrics {
2222

2323
impl Default for Metrics {
2424
fn default() -> Self {
25-
let socket = get_address();
25+
let socket = get_url();
2626
let address = format!(
2727
"http://{}:{}",
2828
socket.domain().unwrap(),

server/src/storage/object_storage.rs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ use super::{
2525
SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
2626
};
2727

28+
use crate::handlers::http::modal::ingest_server::INGESTOR_META;
2829
use crate::option::Mode;
29-
use crate::utils::get_address;
30+
use crate::utils::get_url;
3031
use crate::{
3132
alerts::Alerts,
3233
catalog::{self, manifest::Manifest, snapshot::Snapshot},
@@ -257,7 +258,7 @@ pub trait ObjectStorage: Sync + 'static {
257258
let stream_metadata = match self.get_object(&stream_json_path(stream_name)).await {
258259
Ok(data) => data,
259260
Err(_) => {
260-
// ! this is hard coded for now
261+
// get the base stream metadata
261262
let bytes = self
262263
.get_object(&RelativePathBuf::from_iter([
263264
stream_name,
@@ -538,11 +539,9 @@ fn to_bytes(any: &(impl ?Sized + serde::Serialize)) -> Bytes {
538539
fn schema_path(stream_name: &str) -> RelativePathBuf {
539540
match CONFIG.parseable.mode {
540541
Mode::Ingest => {
541-
let addr = get_address();
542542
let file_name = format!(
543-
".ingestor.{}.{}{}",
544-
addr.domain().unwrap(),
545-
addr.port().unwrap_or_default(),
543+
".ingestor.{}{}",
544+
INGESTOR_META.ingestor_id.clone(),
546545
SCHEMA_FILE_NAME
547546
);
548547

@@ -558,11 +557,9 @@ fn schema_path(stream_name: &str) -> RelativePathBuf {
558557
pub fn stream_json_path(stream_name: &str) -> RelativePathBuf {
559558
match &CONFIG.parseable.mode {
560559
Mode::Ingest => {
561-
let addr = get_address();
562560
let file_name = format!(
563-
".ingestor.{}.{}{}",
564-
addr.domain().unwrap(),
565-
addr.port().unwrap_or_default(),
561+
".ingestor.{}{}",
562+
INGESTOR_META.get_ingestor_id(),
566563
STREAM_METADATA_FILE_NAME
567564
);
568565
RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, &file_name])
@@ -581,14 +578,15 @@ pub fn parseable_json_path() -> RelativePathBuf {
581578
RelativePathBuf::from_iter([PARSEABLE_ROOT_DIRECTORY, PARSEABLE_METADATA_FILE_NAME])
582579
}
583580

581+
/// TODO: Needs to be updated for distributed mode
584582
#[inline(always)]
585583
fn alert_json_path(stream_name: &str) -> RelativePathBuf {
586584
RelativePathBuf::from_iter([stream_name, ALERT_FILE_NAME])
587585
}
588586

589587
#[inline(always)]
590588
fn manifest_path(prefix: &str) -> RelativePathBuf {
591-
let addr = get_address();
589+
let addr = get_url();
592590
let mainfest_file_name = format!(
593591
"{}.{}.{}",
594592
addr.domain().unwrap(),
@@ -599,9 +597,16 @@ fn manifest_path(prefix: &str) -> RelativePathBuf {
599597
}
600598

601599
#[inline(always)]
602-
pub fn ingestor_metadata_path(ip: String, port: String) -> RelativePathBuf {
600+
pub fn ingestor_metadata_path(id: Option<&str>) -> RelativePathBuf {
601+
if let Some(id) = id {
602+
return RelativePathBuf::from_iter([
603+
PARSEABLE_ROOT_DIRECTORY,
604+
&format!("ingestor.{}.json", id),
605+
]);
606+
}
607+
603608
RelativePathBuf::from_iter([
604609
PARSEABLE_ROOT_DIRECTORY,
605-
&format!("ingestor.{}.{}.json", ip, port),
610+
&format!("ingestor.{}.json", INGESTOR_META.get_ingestor_id()),
606611
])
607612
}

0 commit comments

Comments
 (0)