Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 25 additions & 23 deletions server/src/handlers/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

use actix_cors::Cors;
use arrow_schema::Schema;
use itertools::Itertools;
use serde_json::Value;

use crate::option::CONFIG;

use self::{cluster::get_ingester_info, query::Query};

pub(crate) mod about;
Expand Down Expand Up @@ -61,32 +64,31 @@ pub fn base_path_without_preceding_slash() -> String {
format!("{API_BASE_PATH}/{API_VERSION}")
}

/// Fetches the schema for the specified stream.
///
/// # Arguments
///
/// * `stream_name` - The name of the stream to fetch the schema for.
///
/// # Returns
///
/// An `anyhow::Result` containing the `arrow_schema::Schema` for the specified stream.
pub async fn fetch_schema(stream_name: &str) -> anyhow::Result<arrow_schema::Schema> {
let mut res = vec![];
let ima = get_ingester_info().await.unwrap();

for im in ima {
let uri = format!(
"{}{}/logstream/{}/schema",
im.domain_name,
base_path_without_preceding_slash(),
stream_name
);
let reqw = reqwest::Client::new()
.get(uri)
.header(http::header::AUTHORIZATION, im.token.clone())
.header(http::header::CONTENT_TYPE, "application/json")
.send()
.await?;

if reqw.status().is_success() {
let v = serde_json::from_slice(&reqw.bytes().await?)?;
res.push(v);
}
}
let path_prefix =
relative_path::RelativePathBuf::from(format!("{}/{}", stream_name, ".stream"));
let store = CONFIG.storage().get_object_store();
let res: Vec<Schema> = store
.get_objects(
Some(&path_prefix),
Box::new(|file_name: String| file_name.contains(".schema")),
)
.await?
.iter()
// we should be able to unwrap as we know the data is valid schema
.map(|byte_obj| serde_json::from_slice(byte_obj).unwrap())
.collect_vec();

let new_schema = Schema::try_merge(res)?;

Ok(new_schema)
}

Expand Down
13 changes: 11 additions & 2 deletions server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use super::base_path_without_preceding_slash;
use super::modal::IngesterMetadata;

// forward the request to all ingesters to keep them in sync
#[allow(dead_code)]
pub async fn sync_streams_with_ingesters(
stream_name: &str,
time_partition: &str,
Expand Down Expand Up @@ -117,7 +118,10 @@ pub async fn fetch_stats_from_ingesters(
let obs = CONFIG
.storage()
.get_object_store()
.get_objects(Some(&path), ".ingester")
.get_objects(
Some(&path),
Box::new(|file_name| file_name.starts_with(".ingester")),
)
.await?;
let mut ingestion_size = 0u64;
let mut storage_size = 0u64;
Expand All @@ -140,6 +144,7 @@ pub async fn fetch_stats_from_ingesters(
Ok(vec![qs])
}

#[allow(dead_code)]
async fn send_stream_sync_request(
url: &str,
ingester: IngesterMetadata,
Expand Down Expand Up @@ -183,6 +188,7 @@ async fn send_stream_sync_request(
}

/// send a rollback request to all ingesters
#[allow(dead_code)]
async fn send_stream_rollback_request(
url: &str,
ingester: IngesterMetadata,
Expand Down Expand Up @@ -346,7 +352,10 @@ pub async fn get_ingester_info() -> anyhow::Result<IngesterMetadataArr> {

let root_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY);
let arr = store
.get_objects(Some(&root_path), "ingester")
.get_objects(
Some(&root_path),
Box::new(|file_name| file_name.starts_with("ingester")),
)
.await?
.iter()
// this unwrap will most definateley shoot me in the foot later
Expand Down
30 changes: 24 additions & 6 deletions server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ use crate::handlers::{
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL, PREFIX_META, PREFIX_TAGS, SEPARATOR,
STREAM_NAME_HEADER_KEY,
};
use crate::metadata::STREAM_INFO;
use crate::metadata::{self, STREAM_INFO};
use crate::option::{Mode, CONFIG};
use crate::storage::ObjectStorageError;
use crate::storage::{LogStream, ObjectStorageError};
use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError};
use actix_web::{http::header::ContentType, HttpRequest, HttpResponse};
use arrow_schema::{Field, Schema};
Expand Down Expand Up @@ -165,10 +165,28 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr
.await?;
}
Mode::Ingest => {
return Err(PostError::Invalid(anyhow::anyhow!(
"Stream {} not found. Has it been created?",
stream_name
)));
// here the ingest server has not found the stream
// so it should check if the stream exists in storage
let store = CONFIG.storage().get_object_store();
let streams = store.list_streams().await?;
if !streams.contains(&LogStream {
name: stream_name.to_owned(),
}) {
log::error!("Stream {} not found", stream_name);
return Err(PostError::Invalid(anyhow::anyhow!(
"Stream {} not found. Has it been created?",
stream_name
)));
}
metadata::STREAM_INFO
.upsert_stream_info(
&*store,
LogStream {
name: stream_name.to_owned(),
},
)
.await
.map_err(|_| PostError::StreamNotFound(stream_name.to_owned()))?;
}
}
Ok(())
Expand Down
5 changes: 1 addition & 4 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo};
use crate::{catalog, event, stats};
use crate::{metadata, validator};

use super::cluster::fetch_stats_from_ingesters;
use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats};
use super::cluster::{fetch_stats_from_ingesters, sync_streams_with_ingesters};
use actix_web::http::StatusCode;
use actix_web::{web, HttpRequest, Responder};
use arrow_schema::{Field, Schema};
Expand Down Expand Up @@ -166,9 +166,6 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
});
}

if CONFIG.parseable.mode == Mode::Query {
sync_streams_with_ingesters(&stream_name, time_partition, static_schema_flag, body).await?;
}
create_stream(stream_name, time_partition, static_schema_flag, schema).await?;

Ok(("log stream created", StatusCode::OK))
Expand Down
5 changes: 4 additions & 1 deletion server/src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,10 @@ impl IngestServer {
let store = CONFIG.storage().get_object_store();
let base_path = RelativePathBuf::from("");
let ingester_metadata = store
.get_objects(Some(&base_path), "ingester")
.get_objects(
Some(&base_path),
Box::new(|file_name| file_name.starts_with("ingester")),
)
.await?
.iter()
// this unwrap will most definateley shoot me in the foot later
Expand Down
1 change: 1 addition & 0 deletions server/src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon

if CONFIG.parseable.mode == Mode::Query {
if let Ok(new_schema) = fetch_schema(&table_name).await {
// commit schema merges the schema internally and updates the schema in storage.
commit_schema_to_storage(&table_name, new_schema.clone())
.await
.map_err(QueryError::ObjectStorage)?;
Expand Down
61 changes: 35 additions & 26 deletions server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::sync::{Arc, RwLock};

use crate::alerts::Alerts;
use crate::metrics::{EVENTS_INGESTED, EVENTS_INGESTED_SIZE};
use crate::storage::{ObjectStorage, StorageDir};
use crate::storage::{LogStream, ObjectStorage, StorageDir};
use crate::utils::arrow::MergedRecordReader;

use self::error::stream_info::{CheckAlertError, LoadError, MetadataError};
Expand Down Expand Up @@ -208,32 +208,41 @@ impl StreamInfo {
// return error in case of an error from object storage itself.

for stream in storage.list_streams().await? {
let alerts = storage.get_alerts(&stream.name).await?;
let schema = storage.get_schema_on_server_start(&stream.name).await?;
let meta = storage.get_stream_metadata(&stream.name).await?;

let schema = update_schema_from_staging(&stream.name, schema);
let schema = HashMap::from_iter(
schema
.fields
.iter()
.map(|v| (v.name().to_owned(), v.clone())),
);

let metadata = LogStreamMetadata {
schema,
alerts,
cache_enabled: meta.cache_enabled,
created_at: meta.created_at,
first_event_at: meta.first_event_at,
time_partition: meta.time_partition,
static_schema_flag: meta.static_schema_flag,
};

let mut map = self.write().expect(LOCK_EXPECT);

map.insert(stream.name, metadata);
self.upsert_stream_info(storage, stream).await?;
}
Ok(())
}

pub async fn upsert_stream_info(
&self,
storage: &(impl ObjectStorage + ?Sized),
stream: LogStream,
) -> Result<(), LoadError> {
let alerts = storage.get_alerts(&stream.name).await?;
let schema = storage.get_schema_on_server_start(&stream.name).await?;
let meta = storage.get_stream_metadata(&stream.name).await?;

let schema = update_schema_from_staging(&stream.name, schema);
let schema = HashMap::from_iter(
schema
.fields
.iter()
.map(|v| (v.name().to_owned(), v.clone())),
);

let metadata = LogStreamMetadata {
schema,
alerts,
cache_enabled: meta.cache_enabled,
created_at: meta.created_at,
first_event_at: meta.first_event_at,
time_partition: meta.time_partition,
static_schema_flag: meta.static_schema_flag,
};

let mut map = self.write().expect(LOCK_EXPECT);

map.insert(stream.name, metadata);

Ok(())
}
Expand Down
13 changes: 11 additions & 2 deletions server/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,15 @@ pub struct Config {

impl Config {
fn new() -> Self {
let cli = create_parseable_cli_command().get_matches();
let cli = create_parseable_cli_command()
.name("Parseable")
.about("A Cloud Native, log analytics platform")
.before_help("Log Lake for the cloud-native world")
.arg_required_else_help(true)
.subcommand_required(true)
.color(clap::ColorChoice::Always)
.get_matches();

match cli.subcommand() {
Some(("local-store", m)) => {
let cli = match Cli::from_arg_matches(m) {
Expand Down Expand Up @@ -181,7 +189,8 @@ fn create_parseable_cli_command() -> Command {
.next_line_help(false)
.help_template(
r#"
{about} Join the community at https://logg.ing/community.
{about}
Join the community at https://logg.ing/community.

{all-args}
"#,
Expand Down
2 changes: 1 addition & 1 deletion server/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl ObjectStoreFormat {
}
}

#[derive(serde::Serialize)]
#[derive(serde::Serialize, PartialEq)]
pub struct LogStream {
pub name: String,
}
Expand Down
11 changes: 6 additions & 5 deletions server/src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl ObjectStorage for LocalFS {
async fn get_objects(
&self,
base_path: Option<&RelativePath>,
_starts_with_pattern: &str,
filter_func: Box<(dyn Fn(String) -> bool + std::marker::Send + 'static)>,
) -> Result<Vec<Bytes>, ObjectStorageError> {
let time = Instant::now();

Expand All @@ -206,13 +206,14 @@ impl ObjectStorage for LocalFS {
let mut entries = fs::read_dir(&prefix).await?;
let mut res = Vec::new();
while let Some(entry) = entries.next_entry().await? {
let ingester_file = entry
let path = entry
.path()
.file_name()
.unwrap_or_default()
.unwrap()
.to_str()
.unwrap_or_default()
.contains("ingester");
.unwrap()
.to_owned();
let ingester_file = filter_func(path);

if !ingester_file {
continue;
Expand Down
2 changes: 1 addition & 1 deletion server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub trait ObjectStorage: Sync + 'static {
async fn get_objects(
&self,
base_path: Option<&RelativePath>,
starts_with_pattern: &str,
filter_fun: Box<dyn Fn(String) -> bool + Send>,
) -> Result<Vec<Bytes>, ObjectStorageError>;
async fn put_object(
&self,
Expand Down
9 changes: 2 additions & 7 deletions server/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,11 +412,10 @@ impl ObjectStorage for S3 {
Ok(self._get_object(path).await?)
}

// TBD is this the right way or the api calls are too many?
async fn get_objects(
&self,
base_path: Option<&RelativePath>,
starts_with_pattern: &str,
filter_func: Box<dyn Fn(String) -> bool + Send>,
) -> Result<Vec<Bytes>, ObjectStorageError> {
let instant = Instant::now();

Expand All @@ -431,11 +430,7 @@ impl ObjectStorage for S3 {
let mut res = vec![];

while let Some(meta) = list_stream.next().await.transpose()? {
let ingester_file = meta
.location
.filename()
.unwrap()
.starts_with(starts_with_pattern);
let ingester_file = filter_func(meta.location.filename().unwrap().to_string());

if !ingester_file {
continue;
Expand Down