Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 88 additions & 41 deletions server/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ use std::{io::ErrorKind, sync::Arc};

use self::{column::Column, snapshot::ManifestItem};
use crate::handlers::http::base_path_without_preceding_slash;
use crate::metadata::STREAM_INFO;
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
use crate::option::CONFIG;
use crate::stats::{event_labels_date, storage_size_labels_date, update_deleted_stats};
use crate::stats::{
event_labels_date, get_current_stats, storage_size_labels_date, update_deleted_stats,
};
use crate::{
catalog::manifest::Manifest,
event::DEFAULT_TIMESTAMP_KEY,
Expand Down Expand Up @@ -103,9 +106,8 @@ pub async fn update_snapshot(
change: manifest::File,
) -> Result<(), ObjectStorageError> {
let mut meta = storage.get_object_store_format(stream_name).await?;
let meta_clone = meta.clone();
let manifests = &mut meta.snapshot.manifest_list;
let time_partition = &meta_clone.time_partition;
let time_partition = &meta.time_partition;
let lower_bound = match time_partition {
Some(time_partition) => {
let (lower_bound, _) = get_file_bounds(&change, time_partition.to_string());
Expand Down Expand Up @@ -174,12 +176,17 @@ pub async fn update_snapshot(
}
}

meta.snapshot.manifest_list = manifests.to_vec();
storage.put_snapshot(stream_name, meta.snapshot).await?;
if ch {
if let Some(mut manifest) = storage.get_manifest(&path).await? {
manifest.apply_change(change);
storage.put_manifest(&path, manifest).await?;
let stats = get_current_stats(stream_name, "json");
if let Some(stats) = stats {
meta.stats = stats;
}
meta.snapshot.manifest_list = manifests.to_vec();

storage.put_stream_manifest(stream_name, &meta).await?;
} else {
//instead of returning an error, create a new manifest (otherwise local to storage sync fails)
//but don't update the snapshot
Expand All @@ -189,7 +196,7 @@ pub async fn update_snapshot(
storage.clone(),
stream_name,
false,
meta_clone,
meta,
events_ingested,
ingestion_size,
storage_size,
Expand All @@ -203,7 +210,7 @@ pub async fn update_snapshot(
storage.clone(),
stream_name,
true,
meta_clone,
meta,
events_ingested,
ingestion_size,
storage_size,
Expand All @@ -217,7 +224,7 @@ pub async fn update_snapshot(
storage.clone(),
stream_name,
true,
meta_clone,
meta,
events_ingested,
ingestion_size,
storage_size,
Expand Down Expand Up @@ -256,6 +263,30 @@ async fn create_manifest(
files: vec![change],
..Manifest::default()
};
let mut first_event_at = STREAM_INFO.get_first_event(stream_name)?;
if first_event_at.is_none() {
if let Some(first_event) = manifest.files.first() {
let time_partition = &meta.time_partition;
let lower_bound = match time_partition {
Some(time_partition) => {
let (lower_bound, _) = get_file_bounds(first_event, time_partition.to_string());
lower_bound
}
None => {
let (lower_bound, _) =
get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string());
lower_bound
}
};
first_event_at = Some(lower_bound.with_timezone(&Local).to_rfc3339());
if let Err(err) = STREAM_INFO.set_first_event_at(stream_name, first_event_at.clone()) {
log::error!(
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
stream_name
);
}
}
}

let mainfest_file_name = manifest_path("").to_string();
let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
Expand All @@ -275,7 +306,12 @@ async fn create_manifest(
};
manifests.push(new_snapshot_entry);
meta.snapshot.manifest_list = manifests;
storage.put_snapshot(stream_name, meta.snapshot).await?;
let stats = get_current_stats(stream_name, "json");
if let Some(stats) = stats {
meta.stats = stats;
}
meta.first_event_at = first_event_at;
storage.put_stream_manifest(stream_name, &meta).await?;
}

Ok(())
Expand All @@ -294,6 +330,8 @@ pub async fn remove_manifest_from_snapshot(
let manifests = &mut meta.snapshot.manifest_list;
// Filter out items whose manifest_path contains any of the dates_to_delete
manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date)));
meta.first_event_at = None;
STREAM_INFO.set_first_event_at(stream_name, None)?;
storage.put_snapshot(stream_name, meta.snapshot).await?;
}
match CONFIG.parseable.mode {
Expand All @@ -313,39 +351,48 @@ pub async fn get_first_event(
match CONFIG.parseable.mode {
Mode::All | Mode::Ingest => {
// get current snapshot
let mut meta = storage.get_object_store_format(stream_name).await?;
let manifests = &mut meta.snapshot.manifest_list;
let time_partition = meta.time_partition;
if manifests.is_empty() {
log::info!("No manifest found for stream {stream_name}");
return Err(ObjectStorageError::Custom("No manifest found".to_string()));
}
let manifest = &manifests[0];
let path = partition_path(
stream_name,
manifest.time_lower_bound,
manifest.time_upper_bound,
);
let Some(manifest) = storage.get_manifest(&path).await? else {
return Err(ObjectStorageError::UnhandledError(
"Manifest found in snapshot but not in object-storage"
.to_string()
.into(),
));
};
if let Some(first_event) = manifest.files.first() {
let lower_bound = match time_partition {
Some(time_partition) => {
let (lower_bound, _) = get_file_bounds(first_event, time_partition);
lower_bound
}
None => {
let (lower_bound, _) =
get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string());
lower_bound
}
let stream_first_event = STREAM_INFO.get_first_event(stream_name)?;
if stream_first_event.is_some() {
first_event_at = stream_first_event.unwrap();
} else {
let mut meta = storage.get_object_store_format(stream_name).await?;
let meta_clone = meta.clone();
let manifests = meta_clone.snapshot.manifest_list;
let time_partition = meta_clone.time_partition;
if manifests.is_empty() {
log::info!("No manifest found for stream {stream_name}");
return Err(ObjectStorageError::Custom("No manifest found".to_string()));
}
let manifest = &manifests[0];
let path = partition_path(
stream_name,
manifest.time_lower_bound,
manifest.time_upper_bound,
);
let Some(manifest) = storage.get_manifest(&path).await? else {
return Err(ObjectStorageError::UnhandledError(
"Manifest found in snapshot but not in object-storage"
.to_string()
.into(),
));
};
first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
if let Some(first_event) = manifest.files.first() {
let lower_bound = match time_partition {
Some(time_partition) => {
let (lower_bound, _) = get_file_bounds(first_event, time_partition);
lower_bound
}
None => {
let (lower_bound, _) =
get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string());
lower_bound
}
};
first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
meta.first_event_at = Some(first_event_at.clone());
storage.put_stream_manifest(stream_name, &meta).await?;
STREAM_INFO.set_first_event_at(stream_name, Some(first_event_at.clone()))?;
}
}
}
Mode::Query => {
Expand Down
5 changes: 4 additions & 1 deletion server/src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ fn valid_type(data_type: &DataType, value: &Value) -> bool {
}
}
DataType::Timestamp(_, _) => value.is_string() || value.is_number(),
_ => unreachable!(),
_ => {
log::error!("Unsupported datatype {:?}, value {:?}", data_type, value);
unreachable!()
}
}
}
33 changes: 17 additions & 16 deletions server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::handlers::{
STREAM_NAME_HEADER_KEY,
};
use crate::localcache::CacheError;
use crate::metadata::error::stream_info::MetadataError;
use crate::metadata::{self, STREAM_INFO};
use crate::option::{Mode, CONFIG};
use crate::storage::{LogStream, ObjectStorageError};
Expand Down Expand Up @@ -62,7 +63,7 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
stream_name
)));
}
create_stream_if_not_exists(&stream_name).await?;
create_stream_if_not_exists(&stream_name, false).await?;

flatten_and_push_logs(req, body, stream_name).await?;
Ok(HttpResponse::Ok().finish())
Expand All @@ -72,7 +73,7 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
}

pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> {
create_stream_if_not_exists(&stream_name).await?;
create_stream_if_not_exists(&stream_name, true).await?;
let size: usize = body.len();
let parsed_timestamp = Utc::now().naive_utc();
let (rb, is_first) = {
Expand Down Expand Up @@ -115,7 +116,7 @@ pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result<HttpRespo
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
{
let stream_name = stream_name.to_str().unwrap().to_owned();
create_stream_if_not_exists(&stream_name).await?;
create_stream_if_not_exists(&stream_name, false).await?;

//flatten logs
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY)
Expand All @@ -128,7 +129,6 @@ pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result<HttpRespo
push_logs(stream_name.to_string(), req.clone(), body).await?;
}
} else {
log::warn!("Unknown log source: {}", log_source);
return Err(PostError::CustomError("Unknown log source".to_string()));
}
} else {
Expand Down Expand Up @@ -206,16 +206,10 @@ pub async fn push_logs_unchecked(
}

async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result<(), PostError> {
let glob_storage = CONFIG.storage().get_object_store();
let object_store_format = glob_storage
.get_object_store_format(&stream_name)
.await
.map_err(|_| PostError::StreamNotFound(stream_name.clone()))?;

let time_partition = object_store_format.time_partition;
let time_partition_limit = object_store_format.time_partition_limit;
let static_schema_flag = object_store_format.static_schema_flag;
let custom_partition = object_store_format.custom_partition;
let time_partition = STREAM_INFO.get_time_partition(&stream_name)?;
let time_partition_limit = STREAM_INFO.get_time_partition_limit(&stream_name)?;
let static_schema_flag = STREAM_INFO.get_static_schema_flag(&stream_name)?;
let custom_partition = STREAM_INFO.get_custom_partition(&stream_name)?;
let body_val: Value = serde_json::from_slice(&body)?;
let size: usize = body.len();
let mut parsed_timestamp = Utc::now().naive_utc();
Expand Down Expand Up @@ -414,7 +408,10 @@ fn into_event_batch(
}

// Check if the stream exists and create a new stream if doesn't exist
pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostError> {
pub async fn create_stream_if_not_exists(
stream_name: &str,
internal_stream: bool,
) -> Result<(), PostError> {
if STREAM_INFO.stream_exists(stream_name) {
return Ok(());
}
Expand All @@ -427,6 +424,7 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr
"",
"",
Arc::new(Schema::empty()),
internal_stream,
)
.await?;
}
Expand All @@ -440,7 +438,7 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr
}) {
log::error!("Stream {} not found", stream_name);
return Err(PostError::Invalid(anyhow::anyhow!(
"Stream {} not found. Has it been created?",
"Stream `{}` not found. Please create it using the Query server.",
stream_name
)));
}
Expand Down Expand Up @@ -472,6 +470,8 @@ pub enum PostError {
Invalid(#[from] anyhow::Error),
#[error("{0}")]
CreateStream(#[from] CreateStreamError),
#[error("Error: {0}")]
MetadataStreamError(#[from] MetadataError),
#[allow(unused)]
#[error("Error: {0}")]
CustomError(String),
Expand All @@ -498,6 +498,7 @@ impl actix_web::ResponseError for PostError {
StatusCode::BAD_REQUEST
}
PostError::CreateStream(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::MetadataStreamError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::StreamNotFound(_) => StatusCode::NOT_FOUND,
PostError::CustomError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::NetworkError(_) => StatusCode::INTERNAL_SERVER_ERROR,
Expand Down
Loading