Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 80 additions & 85 deletions src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use crate::{handlers::http::base_path, option::CONFIG};
use actix_web::web;
use actix_web::web::resource;
use actix_web::Scope;
use anyhow::anyhow;
use async_trait::async_trait;
use base64::Engine;
use bytes::Bytes;
Expand Down Expand Up @@ -85,14 +84,14 @@ impl ParseableServer for IngestServer {
// parseable can't use local storage for persistence when running a distributed setup
if CONFIG.get_storage_mode_string() == "Local drive" {
return Err(anyhow::Error::msg(
"This instance of the Parseable server has been configured to run in a distributed setup, it doesn't support local storage.",
));
"This instance of the Parseable server has been configured to run in a distributed setup, it doesn't support local storage.",
));
}

// check for querier state. Is it there, or was it there in the past
let parseable_json = self.check_querier_state().await?;
let parseable_json = check_querier_state().await?;
// to get the .parseable.json file in staging
self.validate_credentials().await?;
validate_credentials().await?;

Ok(parseable_json)
}
Expand All @@ -112,7 +111,7 @@ impl ParseableServer for IngestServer {
tokio::spawn(airplane::server());

// set the ingestor metadata
self.set_ingestor_metadata().await?;
set_ingestor_metadata().await?;

// Ingestors shouldn't have to deal with OpenId auth flow
let app = self.start(prometheus, None);
Expand Down Expand Up @@ -278,96 +277,92 @@ impl IngestServer {
),
)
}
}

// create the ingestor metadata and put the .ingestor.json file in the object store
pub async fn set_ingestor_metadata() -> anyhow::Result<()> {
let storage_ingestor_metadata = migrate_ingester_metadata().await?;
let store = CONFIG.storage().get_object_store();

// find the meta file in staging if not generate new metadata
let resource = INGESTOR_META.clone();
// use the id that was generated/found in the staging and
// generate the path for the object store
let path = ingestor_metadata_path(None);

// we are considering that we can always get from object store
if let Some(mut store_data) = storage_ingestor_metadata {
if store_data.domain_name != INGESTOR_META.domain_name {
store_data
.domain_name
.clone_from(&INGESTOR_META.domain_name);
store_data.port.clone_from(&INGESTOR_META.port);

// create the ingestor metadata and put the .ingestor.json file in the object store
async fn set_ingestor_metadata(&self) -> anyhow::Result<()> {
let storage_ingestor_metadata = migrate_ingester_metadata().await?;
let store = CONFIG.storage().get_object_store();

// find the meta file in staging if not generate new metadata
let resource = INGESTOR_META.clone();
// use the id that was generated/found in the staging and
// generate the path for the object store
let path = ingestor_metadata_path(None);

// we are considering that we can always get from object store
if storage_ingestor_metadata.is_some() {
let mut store_data = storage_ingestor_metadata.unwrap();

if store_data.domain_name != INGESTOR_META.domain_name {
store_data
.domain_name
.clone_from(&INGESTOR_META.domain_name);
store_data.port.clone_from(&INGESTOR_META.port);

let resource = Bytes::from(serde_json::to_vec(&store_data)?);

// if pushing to object store fails propagate the error
return store
.put_object(&path, resource)
.await
.map_err(|err| anyhow!(err));
}
} else {
let resource = Bytes::from(serde_json::to_vec(&resource)?);
let resource = Bytes::from(serde_json::to_vec(&store_data)?);

// if pushing to object store fails propagate the error
store.put_object(&path, resource).await?;
}
} else {
let resource = Bytes::from(serde_json::to_vec(&resource)?);

Ok(())
store.put_object(&path, resource).await?;
}

// check for querier state. Is it there, or was it there in the past
// this should happen before the set the ingestor metadata
async fn check_querier_state(&self) -> anyhow::Result<Option<Bytes>, ObjectStorageError> {
// how do we check for querier state?
// based on the work flow of the system, the querier will always need to start first
// i.e the querier will create the `.parseable.json` file

let store = CONFIG.storage().get_object_store();
let path = parseable_json_path();
Ok(())
}

let parseable_json = store.get_object(&path).await;
match parseable_json {
Ok(_) => Ok(Some(parseable_json.unwrap())),
Err(_) => Err(ObjectStorageError::Custom(
// check for querier state. Is it there, or was it there in the past
// this should happen before the set the ingestor metadata
async fn check_querier_state() -> anyhow::Result<Option<Bytes>, ObjectStorageError> {
// how do we check for querier state?
// based on the work flow of the system, the querier will always need to start first
// i.e the querier will create the `.parseable.json` file
let parseable_json = CONFIG
.storage()
.get_object_store()
.get_object(&parseable_json_path())
.await
.map_err(|_| {
ObjectStorageError::Custom(
"Query Server has not been started yet. Please start the querier server first."
.to_string(),
)),
}
}

async fn validate_credentials(&self) -> anyhow::Result<()> {
// check if your creds match with others
let store = CONFIG.storage().get_object_store();
let base_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY);
let ingestor_metadata = store
.get_objects(
Some(&base_path),
Box::new(|file_name| file_name.starts_with("ingestor")),
)
.await?;
if !ingestor_metadata.is_empty() {
let ingestor_metadata_value: Value =
serde_json::from_slice(&ingestor_metadata[0]).expect("ingestor.json is valid json");
let check = ingestor_metadata_value
.as_object()
.and_then(|meta| meta.get("token"))
.and_then(|token| token.as_str())
.unwrap();

let token = base64::prelude::BASE64_STANDARD.encode(format!(
"{}:{}",
CONFIG.parseable.username, CONFIG.parseable.password
));

let token = format!("Basic {}", token);

if check != token {
return Err(anyhow::anyhow!("Credentials do not match with other ingestors. Please check your credentials and try again."));
}
}
})?;

Ok(Some(parseable_json))
}

Ok(())
async fn validate_credentials() -> anyhow::Result<()> {
// check if your creds match with others
let store = CONFIG.storage().get_object_store();
let base_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY);
let ingestor_metadata = store
.get_objects(
Some(&base_path),
Box::new(|file_name| file_name.starts_with("ingestor")),
)
.await?;
if !ingestor_metadata.is_empty() {
let ingestor_metadata_value: Value =
serde_json::from_slice(&ingestor_metadata[0]).expect("ingestor.json is valid json");
let check = ingestor_metadata_value
.as_object()
.and_then(|meta| meta.get("token"))
.and_then(|token| token.as_str())
.unwrap();

let token = base64::prelude::BASE64_STANDARD.encode(format!(
"{}:{}",
CONFIG.parseable.username, CONFIG.parseable.password
));

let token = format!("Basic {}", token);

if check != token {
return Err(anyhow::anyhow!("Credentials do not match with other ingestors. Please check your credentials and try again."));
}
}

Ok(())
}
4 changes: 2 additions & 2 deletions src/storage/staging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl StorageDir {
let paths = dir
.flatten()
.map(|file| file.path())
.filter(|file| file.extension().map_or(false, |ext| ext.eq("arrows")))
.filter(|file| file.extension().is_some_and(|ext| ext.eq("arrows")))
.sorted_by_key(|f| f.metadata().unwrap().modified().unwrap())
.collect();

Expand Down Expand Up @@ -199,7 +199,7 @@ impl StorageDir {

dir.flatten()
.map(|file| file.path())
.filter(|file| file.extension().map_or(false, |ext| ext.eq("parquet")))
.filter(|file| file.extension().is_some_and(|ext| ext.eq("parquet")))
.collect()
}

Expand Down
Loading