Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::sync::Arc;

use self::error::EventError;
pub use self::writer::STREAM_WRITERS;
use crate::{handlers::http::ingest::PostError, metadata};
use crate::{handlers::http::ingest::PostError, metadata, storage::StreamType};
use chrono::NaiveDateTime;
use std::collections::HashMap;

Expand All @@ -45,6 +45,7 @@ pub struct Event {
pub parsed_timestamp: NaiveDateTime,
pub time_partition: Option<String>,
pub custom_partition_values: HashMap<String, String>,
pub stream_type: StreamType,
}

// Events holds the schema related to a each event for a single log stream
Expand Down Expand Up @@ -75,6 +76,7 @@ impl Event {
self.rb.clone(),
self.parsed_timestamp,
&self.custom_partition_values,
&self.stream_type,
)?;

metadata::STREAM_INFO.update_stats(
Expand Down Expand Up @@ -106,6 +108,7 @@ impl Event {
self.rb.clone(),
self.parsed_timestamp,
&self.custom_partition_values,
&self.stream_type,
)
.map_err(PostError::Event)
}
Expand All @@ -122,13 +125,15 @@ impl Event {
rb: RecordBatch,
parsed_timestamp: NaiveDateTime,
custom_partition_values: &HashMap<String, String>,
stream_type: &StreamType,
) -> Result<(), EventError> {
STREAM_WRITERS.append_to_local(
stream_name,
schema_key,
rb,
parsed_timestamp,
custom_partition_values.clone(),
stream_type,
)?;
Ok(())
}
Expand Down
15 changes: 11 additions & 4 deletions server/src/event/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use std::{
};

use crate::{
handlers::http::cluster::INTERNAL_STREAM_NAME,
option::{Mode, CONFIG},
storage::StreamType,
utils,
};

Expand Down Expand Up @@ -92,6 +92,7 @@ impl WriterTable {
record: RecordBatch,
parsed_timestamp: NaiveDateTime,
custom_partition_values: HashMap<String, String>,
stream_type: &StreamType,
) -> Result<(), StreamWriterError> {
let hashmap_guard = self.read().unwrap();

Expand All @@ -104,6 +105,7 @@ impl WriterTable {
record,
parsed_timestamp,
&custom_partition_values,
stream_type,
)?;
}
None => {
Expand All @@ -118,12 +120,14 @@ impl WriterTable {
record,
parsed_timestamp,
&custom_partition_values,
stream_type,
)?;
}
};
Ok(())
}

#[allow(clippy::too_many_arguments)]
fn handle_existing_writer(
&self,
stream_writer: &Mutex<Writer>,
Expand All @@ -132,8 +136,9 @@ impl WriterTable {
record: RecordBatch,
parsed_timestamp: NaiveDateTime,
custom_partition_values: &HashMap<String, String>,
stream_type: &StreamType,
) -> Result<(), StreamWriterError> {
if CONFIG.parseable.mode != Mode::Query || stream_name == INTERNAL_STREAM_NAME {
if CONFIG.parseable.mode != Mode::Query || *stream_type == StreamType::Internal {
stream_writer.lock().unwrap().push(
stream_name,
schema_key,
Expand All @@ -151,6 +156,7 @@ impl WriterTable {
Ok(())
}

#[allow(clippy::too_many_arguments)]
fn handle_missing_writer(
&self,
mut map: RwLockWriteGuard<HashMap<String, Mutex<Writer>>>,
Expand All @@ -159,10 +165,11 @@ impl WriterTable {
record: RecordBatch,
parsed_timestamp: NaiveDateTime,
custom_partition_values: &HashMap<String, String>,
stream_type: &StreamType,
) -> Result<(), StreamWriterError> {
match map.get(stream_name) {
Some(writer) => {
if CONFIG.parseable.mode != Mode::Query || stream_name == INTERNAL_STREAM_NAME {
if CONFIG.parseable.mode != Mode::Query || *stream_type == StreamType::Internal {
writer.lock().unwrap().push(
stream_name,
schema_key,
Expand All @@ -175,7 +182,7 @@ impl WriterTable {
}
}
None => {
if CONFIG.parseable.mode != Mode::Query || stream_name == INTERNAL_STREAM_NAME {
if CONFIG.parseable.mode != Mode::Query || *stream_type == StreamType::Internal {
let mut writer = Writer::default();
writer.push(
stream_name,
Expand Down
1 change: 1 addition & 0 deletions server/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag";
const AUTHORIZATION_KEY: &str = "authorization";
const SEPARATOR: char = '^';
const UPDATE_STREAM_KEY: &str = "x-p-update-stream";
const STREAM_TYPE_KEY: &str = "x-p-stream-type";
const OIDC_SCOPE: &str = "openid profile email";
const COOKIE_AGE_DAYS: usize = 7;
const SESSION_COOKIE_NAME: &str = "session";
Expand Down
18 changes: 10 additions & 8 deletions server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ use crate::stats::Stats;
use crate::storage::object_storage::ingestor_metadata_path;
use crate::storage::PARSEABLE_ROOT_DIRECTORY;
use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY};
use actix_web::http::header;
use actix_web::http::header::{self, HeaderMap};
use actix_web::{HttpRequest, Responder};
use bytes::Bytes;
use chrono::Utc;
use http::StatusCode;
use http::{header as http_header, StatusCode};
use itertools::Itertools;
use relative_path::RelativePathBuf;
use serde::de::Error;
Expand Down Expand Up @@ -96,10 +96,15 @@ pub async fn sync_cache_with_ingestors(

// forward the request to all ingestors to keep them in sync
pub async fn sync_streams_with_ingestors(
req: HttpRequest,
headers: HeaderMap,
body: Bytes,
stream_name: &str,
) -> Result<(), StreamError> {
let mut reqwest_headers = http_header::HeaderMap::new();

for (key, value) in headers.iter() {
reqwest_headers.insert(key.clone(), value.clone());
}
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingestor info: {:?}", err);
StreamError::Anyhow(err)
Expand All @@ -119,7 +124,7 @@ pub async fn sync_streams_with_ingestors(
);
let res = client
.put(url)
.headers(req.headers().into())
.headers(reqwest_headers.clone())
.header(header::AUTHORIZATION, &ingestor.token)
.body(body.clone())
.send()
Expand Down Expand Up @@ -572,7 +577,6 @@ async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {

pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {
log::info!("Setting up schedular for cluster metrics ingestion");

let mut scheduler = AsyncScheduler::new();
scheduler
.every(CLUSTER_METRICS_INTERVAL_SECONDS)
Expand All @@ -583,11 +587,9 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {
if !metrics.is_empty() {
log::info!("Cluster metrics fetched successfully from all ingestors");
if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) {
let stream_name = INTERNAL_STREAM_NAME;

if matches!(
ingest_internal_stream(
stream_name.to_string(),
INTERNAL_STREAM_NAME.to_string(),
bytes::Bytes::from(metrics_bytes),
)
.await,
Expand Down
28 changes: 17 additions & 11 deletions server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
*
*/

use super::cluster::INTERNAL_STREAM_NAME;
use super::logstream::error::CreateStreamError;
use super::logstream::error::{CreateStreamError, StreamError};
use super::users::dashboards::DashboardError;
use super::users::filters::FiltersError;
use super::{kinesis, otel};
Expand All @@ -34,7 +33,7 @@ use crate::localcache::CacheError;
use crate::metadata::error::stream_info::MetadataError;
use crate::metadata::{self, STREAM_INFO};
use crate::option::{Mode, CONFIG};
use crate::storage::{LogStream, ObjectStorageError};
use crate::storage::{LogStream, ObjectStorageError, StreamType};
use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError};
use crate::utils::json::convert_array_to_object;
use actix_web::{http::header::ContentType, HttpRequest, HttpResponse};
Expand All @@ -57,13 +56,14 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
{
let stream_name = stream_name.to_str().unwrap().to_owned();
if stream_name.eq(INTERNAL_STREAM_NAME) {
let internal_stream_names = STREAM_INFO.list_internal_streams();
if internal_stream_names.contains(&stream_name) {
return Err(PostError::Invalid(anyhow::anyhow!(
"Stream {} is an internal stream and cannot be ingested into",
"The stream {} is reserved for internal use and cannot be ingested into",
stream_name
)));
}
create_stream_if_not_exists(&stream_name, false).await?;
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;

flatten_and_push_logs(req, body, stream_name).await?;
Ok(HttpResponse::Ok().finish())
Expand All @@ -73,7 +73,6 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
}

pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> {
create_stream_if_not_exists(&stream_name, true).await?;
let size: usize = body.len();
let parsed_timestamp = Utc::now().naive_utc();
let (rb, is_first) = {
Expand All @@ -100,6 +99,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
parsed_timestamp,
time_partition: None,
custom_partition_values: HashMap::new(),
stream_type: StreamType::Internal,
}
.process()
.await?;
Expand All @@ -116,7 +116,7 @@ pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result<HttpRespo
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
{
let stream_name = stream_name.to_str().unwrap().to_owned();
create_stream_if_not_exists(&stream_name, false).await?;
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;

//flatten logs
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY)
Expand Down Expand Up @@ -176,7 +176,8 @@ async fn flatten_and_push_logs(
// fails if the logstream does not exist
pub async fn post_event(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
if stream_name.eq(INTERNAL_STREAM_NAME) {
let internal_stream_names = STREAM_INFO.list_internal_streams();
if internal_stream_names.contains(&stream_name) {
return Err(PostError::Invalid(anyhow::anyhow!(
"Stream {} is an internal stream and cannot be ingested into",
stream_name
Expand All @@ -202,6 +203,7 @@ pub async fn push_logs_unchecked(
time_partition: None,
is_first_event: true, // NOTE: Maybe should be false
custom_partition_values: HashMap::new(), // should be an empty map for unchecked push
stream_type: StreamType::UserDefined,
};
unchecked_event.process_unchecked()?;

Expand Down Expand Up @@ -369,6 +371,7 @@ async fn create_process_record_batch(
parsed_timestamp,
time_partition: time_partition.clone(),
custom_partition_values: custom_partition_values.clone(),
stream_type: StreamType::UserDefined,
}
.process()
.await?;
Expand Down Expand Up @@ -413,7 +416,7 @@ fn into_event_batch(
// Check if the stream exists and create a new stream if doesn't exist
pub async fn create_stream_if_not_exists(
stream_name: &str,
internal_stream: bool,
stream_type: &str,
) -> Result<(), PostError> {
if STREAM_INFO.stream_exists(stream_name) {
return Ok(());
Expand All @@ -427,7 +430,7 @@ pub async fn create_stream_if_not_exists(
"",
"",
Arc::new(Schema::empty()),
internal_stream,
stream_type,
)
.await?;
}
Expand Down Expand Up @@ -488,6 +491,8 @@ pub enum PostError {
DashboardError(#[from] DashboardError),
#[error("Error: {0}")]
CacheError(#[from] CacheError),
#[error("Error: {0}")]
StreamError(#[from] StreamError),
}

impl actix_web::ResponseError for PostError {
Expand All @@ -509,6 +514,7 @@ impl actix_web::ResponseError for PostError {
PostError::DashboardError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::FiltersError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::CacheError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::StreamError(_) => StatusCode::INTERNAL_SERVER_ERROR,
}
}

Expand Down
Loading