Skip to content

Commit 4cdd9bd

Browse files
author
Devdutt Shenoi
committed
refactor and expose ingest server util
1 parent 838dd7f commit 4cdd9bd

File tree

1 file changed

+88
-88
lines changed

1 file changed

+88
-88
lines changed

src/handlers/http/modal/ingest_server.rs

Lines changed: 88 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -85,14 +85,14 @@ impl ParseableServer for IngestServer {
8585
// parseable can't use local storage for persistence when running a distributed setup
8686
if CONFIG.get_storage_mode_string() == "Local drive" {
8787
return Err(anyhow::Error::msg(
88-
"This instance of the Parseable server has been configured to run in a distributed setup, it doesn't support local storage.",
89-
));
88+
"This instance of the Parseable server has been configured to run in a distributed setup, it doesn't support local storage.",
89+
));
9090
}
9191

9292
// check for querier state. Is it there, or was it there in the past
93-
let parseable_json = self.check_querier_state().await?;
93+
let parseable_json = check_querier_state().await?;
9494
// to get the .parseable.json file in staging
95-
self.validate_credentials().await?;
95+
validate_credentials().await?;
9696

9797
Ok(parseable_json)
9898
}
@@ -112,7 +112,7 @@ impl ParseableServer for IngestServer {
112112
tokio::spawn(airplane::server());
113113

114114
// set the ingestor metadata
115-
self.set_ingestor_metadata().await?;
115+
set_ingestor_metadata().await?;
116116

117117
// Ingestors shouldn't have to deal with OpenId auth flow
118118
let app = self.start(prometheus, None);
@@ -278,96 +278,96 @@ impl IngestServer {
278278
),
279279
)
280280
}
281+
}
281282

282-
// create the ingestor metadata and put the .ingestor.json file in the object store
283-
async fn set_ingestor_metadata(&self) -> anyhow::Result<()> {
284-
let storage_ingestor_metadata = migrate_ingester_metadata().await?;
285-
let store = CONFIG.storage().get_object_store();
286-
287-
// find the meta file in staging if not generate new metadata
288-
let resource = INGESTOR_META.clone();
289-
// use the id that was generated/found in the staging and
290-
// generate the path for the object store
291-
let path = ingestor_metadata_path(None);
292-
293-
// we are considering that we can always get from object store
294-
if storage_ingestor_metadata.is_some() {
295-
let mut store_data = storage_ingestor_metadata.unwrap();
296-
297-
if store_data.domain_name != INGESTOR_META.domain_name {
298-
store_data
299-
.domain_name
300-
.clone_from(&INGESTOR_META.domain_name);
301-
store_data.port.clone_from(&INGESTOR_META.port);
302-
303-
let resource = Bytes::from(serde_json::to_vec(&store_data)?);
304-
305-
// if pushing to object store fails propagate the error
306-
return store
307-
.put_object(&path, resource)
308-
.await
309-
.map_err(|err| anyhow!(err));
310-
}
311-
} else {
312-
let resource = Bytes::from(serde_json::to_vec(&resource)?);
313-
314-
store.put_object(&path, resource).await?;
283+
// create the ingestor metadata and put the .ingestor.json file in the object store
284+
pub async fn set_ingestor_metadata() -> anyhow::Result<()> {
285+
let storage_ingestor_metadata = migrate_ingester_metadata().await?;
286+
let store = CONFIG.storage().get_object_store();
287+
288+
// find the meta file in staging if not generate new metadata
289+
let resource = INGESTOR_META.clone();
290+
// use the id that was generated/found in the staging and
291+
// generate the path for the object store
292+
let path = ingestor_metadata_path(None);
293+
294+
// we are considering that we can always get from object store
295+
if storage_ingestor_metadata.is_some() {
296+
let mut store_data = storage_ingestor_metadata.unwrap();
297+
298+
if store_data.domain_name != INGESTOR_META.domain_name {
299+
store_data
300+
.domain_name
301+
.clone_from(&INGESTOR_META.domain_name);
302+
store_data.port.clone_from(&INGESTOR_META.port);
303+
304+
let resource = Bytes::from(serde_json::to_vec(&store_data)?);
305+
306+
// if pushing to object store fails propagate the error
307+
return store
308+
.put_object(&path, resource)
309+
.await
310+
.map_err(|err| anyhow!(err));
315311
}
312+
} else {
313+
let resource = Bytes::from(serde_json::to_vec(&resource)?);
316314

317-
Ok(())
315+
store.put_object(&path, resource).await?;
318316
}
319317

320-
// check for querier state. Is it there, or was it there in the past
321-
// this should happen before the set the ingestor metadata
322-
async fn check_querier_state(&self) -> anyhow::Result<Option<Bytes>, ObjectStorageError> {
323-
// how do we check for querier state?
324-
// based on the work flow of the system, the querier will always need to start first
325-
// i.e the querier will create the `.parseable.json` file
326-
327-
let store = CONFIG.storage().get_object_store();
328-
let path = parseable_json_path();
329-
330-
let parseable_json = store.get_object(&path).await;
331-
match parseable_json {
332-
Ok(_) => Ok(Some(parseable_json.unwrap())),
333-
Err(_) => Err(ObjectStorageError::Custom(
334-
"Query Server has not been started yet. Please start the querier server first."
335-
.to_string(),
336-
)),
337-
}
318+
Ok(())
319+
}
320+
321+
// check for querier state. Is it there, or was it there in the past
322+
// this should happen before the set the ingestor metadata
323+
async fn check_querier_state() -> anyhow::Result<Option<Bytes>, ObjectStorageError> {
324+
// how do we check for querier state?
325+
// based on the work flow of the system, the querier will always need to start first
326+
// i.e the querier will create the `.parseable.json` file
327+
328+
let store = CONFIG.storage().get_object_store();
329+
let path = parseable_json_path();
330+
331+
let parseable_json = store.get_object(&path).await;
332+
match parseable_json {
333+
Ok(_) => Ok(Some(parseable_json.unwrap())),
334+
Err(_) => Err(ObjectStorageError::Custom(
335+
"Query Server has not been started yet. Please start the querier server first."
336+
.to_string(),
337+
)),
338338
}
339+
}
339340

340-
async fn validate_credentials(&self) -> anyhow::Result<()> {
341-
// check if your creds match with others
342-
let store = CONFIG.storage().get_object_store();
343-
let base_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY);
344-
let ingestor_metadata = store
345-
.get_objects(
346-
Some(&base_path),
347-
Box::new(|file_name| file_name.starts_with("ingestor")),
348-
)
349-
.await?;
350-
if !ingestor_metadata.is_empty() {
351-
let ingestor_metadata_value: Value =
352-
serde_json::from_slice(&ingestor_metadata[0]).expect("ingestor.json is valid json");
353-
let check = ingestor_metadata_value
354-
.as_object()
355-
.and_then(|meta| meta.get("token"))
356-
.and_then(|token| token.as_str())
357-
.unwrap();
358-
359-
let token = base64::prelude::BASE64_STANDARD.encode(format!(
360-
"{}:{}",
361-
CONFIG.parseable.username, CONFIG.parseable.password
362-
));
363-
364-
let token = format!("Basic {}", token);
365-
366-
if check != token {
367-
return Err(anyhow::anyhow!("Credentials do not match with other ingestors. Please check your credentials and try again."));
368-
}
341+
async fn validate_credentials() -> anyhow::Result<()> {
342+
// check if your creds match with others
343+
let store = CONFIG.storage().get_object_store();
344+
let base_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY);
345+
let ingestor_metadata = store
346+
.get_objects(
347+
Some(&base_path),
348+
Box::new(|file_name| file_name.starts_with("ingestor")),
349+
)
350+
.await?;
351+
if !ingestor_metadata.is_empty() {
352+
let ingestor_metadata_value: Value =
353+
serde_json::from_slice(&ingestor_metadata[0]).expect("ingestor.json is valid json");
354+
let check = ingestor_metadata_value
355+
.as_object()
356+
.and_then(|meta| meta.get("token"))
357+
.and_then(|token| token.as_str())
358+
.unwrap();
359+
360+
let token = base64::prelude::BASE64_STANDARD.encode(format!(
361+
"{}:{}",
362+
CONFIG.parseable.username, CONFIG.parseable.password
363+
));
364+
365+
let token = format!("Basic {}", token);
366+
367+
if check != token {
368+
return Err(anyhow::anyhow!("Credentials do not match with other ingestors. Please check your credentials and try again."));
369369
}
370-
371-
Ok(())
372370
}
371+
372+
Ok(())
373373
}

0 commit comments

Comments
 (0)