From df70d0ec27da9d6ca1c1b1081627ee47ee172935 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 13 Jan 2025 22:50:09 +0530 Subject: [PATCH 01/19] refactor: kinesis message construction may panic --- src/handlers/http/kinesis.rs | 17 +++++++---------- src/handlers/http/modal/utils/ingest_utils.rs | 11 +++++++---- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/handlers/http/kinesis.rs b/src/handlers/http/kinesis.rs index 084f686cd..e2f245f73 100644 --- a/src/handlers/http/kinesis.rs +++ b/src/handlers/http/kinesis.rs @@ -17,15 +17,13 @@ */ use base64::{engine::general_purpose::STANDARD, Engine as _}; -use bytes::Bytes; use serde::{Deserialize, Serialize}; -use serde_json::Value; -use std::collections::BTreeMap; +use serde_json::{Map, Value}; use std::str; #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] -struct Message { +pub struct Message { records: Vec, request_id: String, timestamp: u64, @@ -59,16 +57,14 @@ struct Data { // "requestId": "b858288a-f5d8-4181-a746-3f3dd716be8a", // "timestamp": "1704964113659" // } -pub fn flatten_kinesis_logs(body: &Bytes) -> Vec> { - let body_str = std::str::from_utf8(body).unwrap(); - let message: Message = serde_json::from_str(body_str).unwrap(); - let mut vec_kinesis_json: Vec> = Vec::new(); +pub fn flatten_kinesis_logs(message: Message) -> Vec { + let mut vec_kinesis_json = Vec::new(); for record in message.records.iter() { let bytes = STANDARD.decode(record.data.clone()).unwrap(); let json_string: String = String::from_utf8(bytes).unwrap(); let json: serde_json::Value = serde_json::from_str(&json_string).unwrap(); - let mut kinesis_json: BTreeMap = match serde_json::from_value(json) { + let mut kinesis_json: Map = match serde_json::from_value(json) { Ok(value) => value, Err(error) => panic!("Failed to deserialize JSON: {}", error), }; @@ -82,7 +78,8 @@ pub fn flatten_kinesis_logs(body: &Bytes) -> Vec> { Value::String(message.timestamp.to_string()), ); - vec_kinesis_json.push(kinesis_json); + vec_kinesis_json.push(Value::Object(kinesis_json)); } + vec_kinesis_json } diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 8a13bcefd..14705b13c 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -31,7 +31,10 @@ use crate::{ Event, }, handlers::{ - http::{ingest::PostError, kinesis}, + http::{ + ingest::PostError, + kinesis::{flatten_kinesis_logs, Message}, + }, LOG_SOURCE_KEY, }, metadata::{SchemaVersion, STREAM_INFO}, @@ -53,9 +56,9 @@ pub async fn flatten_and_push_logs( match log_source { LogSource::Kinesis => { - let json = kinesis::flatten_kinesis_logs(&body); - for record in json.iter() { - let body: Bytes = serde_json::to_vec(record).unwrap().into(); + let message: Message = serde_json::from_slice(&body)?; + for record in flatten_kinesis_logs(message) { + let body: Bytes = serde_json::to_vec(&record).unwrap().into(); push_logs(stream_name, &body, &LogSource::default()).await?; } } From c57d931cd17c4e14abbba293b031e382e4e78e33 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 13 Jan 2025 23:00:08 +0530 Subject: [PATCH 02/19] refactor: replace `BTreeMap` with `serde_json::Map` --- src/handlers/http/ingest.rs | 15 +++---- src/otel/logs.rs | 24 +++++------ src/otel/metrics.rs | 80 ++++++++++++++++++------------------- src/otel/otel_utils.rs | 33 ++++++--------- src/otel/traces.rs | 45 ++++++++++----------- 5 files changed, 92 insertions(+), 105 deletions(-) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 8b437f862..6c4064f3e 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -144,9 +144,8 @@ pub async fn handle_otel_logs_ingestion( //custom flattening required for otel logs let logs: LogsData = serde_json::from_slice(body.as_bytes())?; - let mut json = flatten_otel_logs(&logs); - for record in json.iter_mut() { - let body: Bytes = serde_json::to_vec(record).unwrap().into(); + for record in flatten_otel_logs(&logs) { + let body: Bytes = serde_json::to_vec(&record).unwrap().into(); push_logs(&stream_name, &body, &log_source).await?; } @@ -182,9 +181,8 @@ pub async fn handle_otel_metrics_ingestion( //custom flattening required for otel metrics let metrics: MetricsData = serde_json::from_slice(body.as_bytes())?; - let mut json = flatten_otel_metrics(metrics); - for record in json.iter_mut() { - let body: Bytes = serde_json::to_vec(record).unwrap().into(); + for record in flatten_otel_metrics(metrics) { + let body: Bytes = serde_json::to_vec(&record).unwrap().into(); push_logs(&stream_name, &body, &log_source).await?; } @@ -221,9 +219,8 @@ pub async fn handle_otel_traces_ingestion( //custom flattening required for otel traces let traces: TracesData = serde_json::from_slice(body.as_bytes())?; - let mut json = flatten_otel_traces(&traces); - for record in json.iter_mut() { - let body: Bytes = serde_json::to_vec(record).unwrap().into(); + for record in flatten_otel_traces(&traces) { + let body: Bytes = serde_json::to_vec(&record).unwrap().into(); push_logs(&stream_name, &body, &log_source).await?; } diff --git a/src/otel/logs.rs b/src/otel/logs.rs index fcdffe1af..1e5b413cd 100644 --- a/src/otel/logs.rs +++ b/src/otel/logs.rs @@ -20,8 +20,8 @@ use opentelemetry_proto::tonic::logs::v1::LogRecord; use opentelemetry_proto::tonic::logs::v1::LogsData; use opentelemetry_proto::tonic::logs::v1::ScopeLogs; use opentelemetry_proto::tonic::logs::v1::SeverityNumber; +use serde_json::Map; use serde_json::Value; -use std::collections::BTreeMap; use super::otel_utils::collect_json_from_values; use super::otel_utils::convert_epoch_nano_to_timestamp; @@ -31,8 +31,8 @@ use super::otel_utils::insert_attributes; /// there is a mapping of severity number to severity text provided in proto /// this function fetches the severity text from the severity number /// and adds it to the flattened json -fn flatten_severity(severity_number: i32) -> BTreeMap { - let mut severity_json: BTreeMap = BTreeMap::new(); +fn flatten_severity(severity_number: i32) -> Map { + let mut severity_json: Map = Map::new(); severity_json.insert( "severity_number".to_string(), Value::Number(severity_number.into()), @@ -46,10 +46,10 @@ fn flatten_severity(severity_number: i32) -> BTreeMap { } /// this function flattens the `LogRecord` object -/// and returns a `BTreeMap` of the flattened json +/// and returns a `Map` of the flattened json /// this function is called recursively for each log record object in the otel logs -pub fn flatten_log_record(log_record: &LogRecord) -> BTreeMap { - let mut log_record_json: BTreeMap = BTreeMap::new(); +pub fn flatten_log_record(log_record: &LogRecord) -> Map { + let mut log_record_json: Map = Map::new(); log_record_json.insert( "time_unix_nano".to_string(), Value::String(convert_epoch_nano_to_timestamp( @@ -95,10 +95,10 @@ pub fn flatten_log_record(log_record: &LogRecord) -> BTreeMap { } /// this function flattens the `ScopeLogs` object -/// and returns a `Vec` of `BTreeMap` of the flattened json -fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec> { +/// and returns a `Vec` of `Map` of the flattened json +fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec> { let mut vec_scope_log_json = Vec::new(); - let mut scope_log_json = BTreeMap::new(); + let mut scope_log_json = Map::new(); if let Some(scope) = &scope_log.scope { scope_log_json.insert("scope_name".to_string(), Value::String(scope.name.clone())); @@ -128,11 +128,11 @@ fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec> { } /// this function performs the custom flattening of the otel logs -/// and returns a `Vec` of `BTreeMap` of the flattened json -pub fn flatten_otel_logs(message: &LogsData) -> Vec> { +/// and returns a `Vec` of `Map` of the flattened json +pub fn flatten_otel_logs(message: &LogsData) -> Vec> { let mut vec_otel_json = Vec::new(); for record in &message.resource_logs { - let mut resource_log_json = BTreeMap::new(); + let mut resource_log_json = Map::new(); if let Some(resource) = &record.resource { insert_attributes(&mut resource_log_json, &resource.attributes); diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index f5aa1c072..cebddd117 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -16,14 +16,12 @@ * */ -use std::collections::BTreeMap; - use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value as NumberDataPointValue; use opentelemetry_proto::tonic::metrics::v1::{ exemplar::Value as ExemplarValue, exponential_histogram_data_point::Buckets, metric, Exemplar, ExponentialHistogram, Gauge, Histogram, Metric, MetricsData, NumberDataPoint, Sum, Summary, }; -use serde_json::Value; +use serde_json::{Map, Value}; use super::otel_utils::{ convert_epoch_nano_to_timestamp, insert_attributes, insert_number_if_some, @@ -31,10 +29,10 @@ use super::otel_utils::{ /// otel metrics event has json array for exemplar /// this function flatten the exemplar json array -/// and returns a `BTreeMap` of the exemplar json +/// and returns a `Map` of the exemplar json /// this function is reused in all json objects that have exemplar -fn flatten_exemplar(exemplars: &[Exemplar]) -> BTreeMap { - let mut exemplar_json = BTreeMap::new(); +fn flatten_exemplar(exemplars: &[Exemplar]) -> Map { + let mut exemplar_json = Map::new(); for exemplar in exemplars { insert_attributes(&mut exemplar_json, &exemplar.filtered_attributes); exemplar_json.insert( @@ -73,13 +71,13 @@ fn flatten_exemplar(exemplars: &[Exemplar]) -> BTreeMap { /// otel metrics event has json array for number data points /// this function flatten the number data points json array -/// and returns a `Vec` of `BTreeMap` of the flattened json +/// and returns a `Vec` of `Map` of the flattened json /// this function is reused in all json objects that have number data points -fn flatten_number_data_points(data_points: &[NumberDataPoint]) -> Vec> { +fn flatten_number_data_points(data_points: &[NumberDataPoint]) -> Vec> { data_points .iter() .map(|data_point| { - let mut data_point_json = BTreeMap::new(); + let mut data_point_json = Map::new(); insert_attributes(&mut data_point_json, &data_point.attributes); data_point_json.insert( "start_time_unix_nano".to_string(), @@ -122,12 +120,12 @@ fn flatten_number_data_points(data_points: &[NumberDataPoint]) -> Vec Vec> { +/// and returns a `Vec` of `Map` for each data point +fn flatten_gauge(gauge: &Gauge) -> Vec> { let mut vec_gauge_json = Vec::new(); let data_points_json = flatten_number_data_points(&gauge.data_points); for data_point_json in data_points_json { - let mut gauge_json = BTreeMap::new(); + let mut gauge_json = Map::new(); for (key, value) in &data_point_json { gauge_json.insert(key.clone(), value.clone()); } @@ -139,18 +137,18 @@ fn flatten_gauge(gauge: &Gauge) -> Vec> { /// otel metrics event has json object for sum /// each sum object has json array for data points /// this function flatten the sum json object -/// and returns a `Vec` of `BTreeMap` for each data point -fn flatten_sum(sum: &Sum) -> Vec> { +/// and returns a `Vec` of `Map` for each data point +fn flatten_sum(sum: &Sum) -> Vec> { let mut vec_sum_json = Vec::new(); let data_points_json = flatten_number_data_points(&sum.data_points); for data_point_json in data_points_json { - let mut sum_json = BTreeMap::new(); + let mut sum_json = Map::new(); for (key, value) in &data_point_json { sum_json.insert(key.clone(), value.clone()); } vec_sum_json.push(sum_json); } - let mut sum_json = BTreeMap::new(); + let mut sum_json = Map::new(); sum_json.extend(flatten_aggregation_temporality(sum.aggregation_temporality)); sum_json.insert("is_monotonic".to_string(), Value::Bool(sum.is_monotonic)); for data_point_json in &mut vec_sum_json { @@ -164,11 +162,11 @@ fn flatten_sum(sum: &Sum) -> Vec> { /// otel metrics event has json object for histogram /// each histogram object has json array for data points /// this function flatten the histogram json object -/// and returns a `Vec` of `BTreeMap` for each data point -fn flatten_histogram(histogram: &Histogram) -> Vec> { +/// and returns a `Vec` of `Map` for each data point +fn flatten_histogram(histogram: &Histogram) -> Vec> { let mut data_points_json = Vec::new(); for data_point in &histogram.data_points { - let mut data_point_json = BTreeMap::new(); + let mut data_point_json = Map::new(); insert_attributes(&mut data_point_json, &data_point.attributes); data_point_json.insert( "start_time_unix_nano".to_string(), @@ -216,7 +214,7 @@ fn flatten_histogram(histogram: &Histogram) -> Vec> { insert_number_if_some(&mut data_point_json, "max", &data_point.max); data_points_json.push(data_point_json); } - let mut histogram_json = BTreeMap::new(); + let mut histogram_json = Map::new(); histogram_json.extend(flatten_aggregation_temporality( histogram.aggregation_temporality, )); @@ -230,9 +228,9 @@ fn flatten_histogram(histogram: &Histogram) -> Vec> { /// otel metrics event has json object for buckets /// this function flatten the buckets json object -/// and returns a `BTreeMap` of the flattened json -fn flatten_buckets(bucket: &Buckets) -> BTreeMap { - let mut bucket_json = BTreeMap::new(); +/// and returns a `Map` of the flattened json +fn flatten_buckets(bucket: &Buckets) -> Map { + let mut bucket_json = Map::new(); bucket_json.insert("offset".to_string(), Value::Number(bucket.offset.into())); bucket_json.insert( "bucket_count".to_string(), @@ -250,11 +248,11 @@ fn flatten_buckets(bucket: &Buckets) -> BTreeMap { /// otel metrics event has json object for exponential histogram /// each exponential histogram object has json array for data points /// this function flatten the exponential histogram json object -/// and returns a `Vec` of `BTreeMap` for each data point -fn flatten_exp_histogram(exp_histogram: &ExponentialHistogram) -> Vec> { +/// and returns a `Vec` of `Map` for each data point +fn flatten_exp_histogram(exp_histogram: &ExponentialHistogram) -> Vec> { let mut data_points_json = Vec::new(); for data_point in &exp_histogram.data_points { - let mut data_point_json = BTreeMap::new(); + let mut data_point_json = Map::new(); insert_attributes(&mut data_point_json, &data_point.attributes); data_point_json.insert( "start_time_unix_nano".to_string(), @@ -299,7 +297,7 @@ fn flatten_exp_histogram(exp_histogram: &ExponentialHistogram) -> Vec Vec Vec> { +/// and returns a `Vec` of `Map` for each data point +fn flatten_summary(summary: &Summary) -> Vec> { let mut data_points_json = Vec::new(); for data_point in &summary.data_points { - let mut data_point_json = BTreeMap::new(); + let mut data_point_json = Map::new(); insert_attributes(&mut data_point_json, &data_point.attributes); data_point_json.insert( "start_time_unix_nano".to_string(), @@ -379,11 +377,11 @@ fn flatten_summary(summary: &Summary) -> Vec> { /// this function flattens the `Metric` object /// each metric object has json object for gauge, sum, histogram, exponential histogram, summary /// this function flatten the metric json object -/// and returns a `Vec` of `BTreeMap` of the flattened json +/// and returns a `Vec` of `Map` of the flattened json /// this function is called recursively for each metric record object in the otel metrics event -pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec> { +pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec> { let mut data_points_json = Vec::new(); - let mut metric_json = BTreeMap::new(); + let mut metric_json = Map::new(); match &metrics_record.data { Some(metric::Data::Gauge(gauge)) => { @@ -428,11 +426,11 @@ pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec Vec> { +/// and returns a `Vec` of `Map` of the flattened json +pub fn flatten_otel_metrics(message: MetricsData) -> Vec> { let mut vec_otel_json = Vec::new(); for record in &message.resource_metrics { - let mut resource_metrics_json = BTreeMap::new(); + let mut resource_metrics_json = Map::new(); if let Some(resource) = &record.resource { insert_attributes(&mut resource_metrics_json, &resource.attributes); resource_metrics_json.insert( @@ -442,7 +440,7 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Vec } let mut vec_scope_metrics_json = Vec::new(); for scope_metric in &record.scope_metrics { - let mut scope_metrics_json = BTreeMap::new(); + let mut scope_metrics_json = Map::new(); for metrics_record in &scope_metric.metrics { vec_scope_metrics_json.extend(flatten_metrics_record(metrics_record)); } @@ -488,8 +486,8 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Vec /// there is a mapping of aggregation temporality to its description provided in proto /// this function fetches the description from the aggregation temporality /// and adds it to the flattened json -fn flatten_aggregation_temporality(aggregation_temporality: i32) -> BTreeMap { - let mut aggregation_temporality_json = BTreeMap::new(); +fn flatten_aggregation_temporality(aggregation_temporality: i32) -> Map { + let mut aggregation_temporality_json = Map::new(); aggregation_temporality_json.insert( "aggregation_temporality".to_string(), Value::Number(aggregation_temporality.into()), @@ -508,8 +506,8 @@ fn flatten_aggregation_temporality(aggregation_temporality: i32) -> BTreeMap BTreeMap { - let mut data_point_flags_json = BTreeMap::new(); +fn flatten_data_point_flags(flags: u32) -> Map { + let mut data_point_flags_json = Map::new(); data_point_flags_json.insert("data_point_flags".to_string(), Value::Number(flags.into())); let description = match flags { 0 => "DATA_POINT_FLAGS_DO_NOT_USE", diff --git a/src/otel/otel_utils.rs b/src/otel/otel_utils.rs index fabed184f..4eb1fa2a6 100644 --- a/src/otel/otel_utils.rs +++ b/src/otel/otel_utils.rs @@ -18,11 +18,11 @@ use chrono::DateTime; use opentelemetry_proto::tonic::common::v1::{any_value::Value as OtelValue, AnyValue, KeyValue}; -use serde_json::Value; -use std::collections::BTreeMap; +use serde_json::{Map, Value}; + // Value can be one of types - String, Bool, Int, Double, ArrayValue, AnyValue, KeyValueList, Byte -pub fn collect_json_from_value(key: &String, value: OtelValue) -> BTreeMap { - let mut value_json: BTreeMap = BTreeMap::new(); +pub fn collect_json_from_value(key: &String, value: OtelValue) -> Map { + let mut value_json: Map = Map::new(); match value { OtelValue::StringValue(str_val) => { value_json.insert(key.to_string(), Value::String(str_val)); @@ -86,16 +86,13 @@ pub fn collect_json_from_value(key: &String, value: OtelValue) -> BTreeMap BTreeMap { +pub fn collect_json_from_anyvalue(key: &String, value: AnyValue) -> Map { collect_json_from_value(key, value.value.unwrap()) } //traverse through Value by calling function ollect_json_from_any_value -pub fn collect_json_from_values( - values: &Option, - key: &String, -) -> BTreeMap { - let mut value_json: BTreeMap = BTreeMap::new(); +pub fn collect_json_from_values(values: &Option, key: &String) -> Map { + let mut value_json: Map = Map::new(); for value in values.iter() { value_json = collect_json_from_anyvalue(key, value.clone()); @@ -112,8 +109,8 @@ pub fn value_to_string(value: serde_json::Value) -> String { } } -pub fn flatten_attributes(attributes: &Vec) -> BTreeMap { - let mut attributes_json: BTreeMap = BTreeMap::new(); +pub fn flatten_attributes(attributes: &Vec) -> Map { + let mut attributes_json: Map = Map::new(); for attribute in attributes { let key = &attribute.key; let value = &attribute.value; @@ -125,17 +122,13 @@ pub fn flatten_attributes(attributes: &Vec) -> BTreeMap attributes_json } -pub fn insert_if_some( - map: &mut BTreeMap, - key: &str, - option: &Option, -) { +pub fn insert_if_some(map: &mut Map, key: &str, option: &Option) { if let Some(value) = option { map.insert(key.to_string(), Value::String(value.to_string())); } } -pub fn insert_number_if_some(map: &mut BTreeMap, key: &str, option: &Option) { +pub fn insert_number_if_some(map: &mut Map, key: &str, option: &Option) { if let Some(value) = option { if let Some(number) = serde_json::Number::from_f64(*value) { map.insert(key.to_string(), Value::Number(number)); @@ -143,13 +136,13 @@ pub fn insert_number_if_some(map: &mut BTreeMap, key: &str, optio } } -pub fn insert_bool_if_some(map: &mut BTreeMap, key: &str, option: &Option) { +pub fn insert_bool_if_some(map: &mut Map, key: &str, option: &Option) { if let Some(value) = option { map.insert(key.to_string(), Value::Bool(*value)); } } -pub fn insert_attributes(map: &mut BTreeMap, attributes: &Vec) { +pub fn insert_attributes(map: &mut Map, attributes: &Vec) { let attributes_json = flatten_attributes(attributes); for (key, value) in attributes_json { map.insert(key, value); diff --git a/src/otel/traces.rs b/src/otel/traces.rs index 8ba137b33..74235593c 100644 --- a/src/otel/traces.rs +++ b/src/otel/traces.rs @@ -22,17 +22,16 @@ use opentelemetry_proto::tonic::trace::v1::ScopeSpans; use opentelemetry_proto::tonic::trace::v1::Span; use opentelemetry_proto::tonic::trace::v1::Status; use opentelemetry_proto::tonic::trace::v1::TracesData; -use serde_json::Value; -use std::collections::BTreeMap; +use serde_json::{Map, Value}; use super::otel_utils::convert_epoch_nano_to_timestamp; use super::otel_utils::insert_attributes; /// this function flattens the `ScopeSpans` object -/// and returns a `Vec` of `BTreeMap` of the flattened json -fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec> { +/// and returns a `Vec` of `Map` of the flattened json +fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec> { let mut vec_scope_span_json = Vec::new(); - let mut scope_span_json = BTreeMap::new(); + let mut scope_span_json = Map::new(); for span in &scope_span.spans { let span_record_json = flatten_span_record(span); @@ -69,12 +68,12 @@ fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec> { } /// this function performs the custom flattening of the otel traces event -/// and returns a `Vec` of `BTreeMap` of the flattened json -pub fn flatten_otel_traces(message: &TracesData) -> Vec> { +/// and returns a `Vec` of `Map` of the flattened json +pub fn flatten_otel_traces(message: &TracesData) -> Vec> { let mut vec_otel_json = Vec::new(); for record in &message.resource_spans { - let mut resource_span_json = BTreeMap::new(); + let mut resource_span_json = Map::new(); if let Some(resource) = &record.resource { insert_attributes(&mut resource_span_json, &resource.attributes); @@ -109,12 +108,12 @@ pub fn flatten_otel_traces(message: &TracesData) -> Vec> /// otel traces has json array of events /// this function flattens the `Event` object -/// and returns a `Vec` of `BTreeMap` of the flattened json -fn flatten_events(events: &[Event]) -> Vec> { +/// and returns a `Vec` of `Map` of the flattened json +fn flatten_events(events: &[Event]) -> Vec> { events .iter() .map(|event| { - let mut event_json = BTreeMap::new(); + let mut event_json = Map::new(); event_json.insert( "event_time_unix_nano".to_string(), Value::String( @@ -134,12 +133,12 @@ fn flatten_events(events: &[Event]) -> Vec> { /// otel traces has json array of links /// this function flattens the `Link` object -/// and returns a `Vec` of `BTreeMap` of the flattened json -fn flatten_links(links: &[Link]) -> Vec> { +/// and returns a `Vec` of `Map` of the flattened json +fn flatten_links(links: &[Link]) -> Vec> { links .iter() .map(|link| { - let mut link_json = BTreeMap::new(); + let mut link_json = Map::new(); link_json.insert( "link_span_id".to_string(), Value::String(hex::encode(&link.span_id)), @@ -163,8 +162,8 @@ fn flatten_links(links: &[Link]) -> Vec> { /// there is a mapping of status code to status description provided in proto /// this function fetches the status description from the status code /// and adds it to the flattened json -fn flatten_status(status: &Status) -> BTreeMap { - let mut status_json = BTreeMap::new(); +fn flatten_status(status: &Status) -> Map { + let mut status_json = Map::new(); status_json.insert( "span_status_message".to_string(), Value::String(status.message.clone()), @@ -191,8 +190,8 @@ fn flatten_status(status: &Status) -> BTreeMap { /// there is a mapping of flags to flags description provided in proto /// this function fetches the flags description from the flags /// and adds it to the flattened json -fn flatten_flags(flags: u32) -> BTreeMap { - let mut flags_json = BTreeMap::new(); +fn flatten_flags(flags: u32) -> Map { + let mut flags_json = Map::new(); flags_json.insert("span_flags".to_string(), Value::Number(flags.into())); let description = match flags { 0 => "SPAN_FLAGS_DO_NOT_USE", @@ -213,8 +212,8 @@ fn flatten_flags(flags: u32) -> BTreeMap { /// there is a mapping of kind to kind description provided in proto /// this function fetches the kind description from the kind /// and adds it to the flattened json -fn flatten_kind(kind: i32) -> BTreeMap { - let mut kind_json = BTreeMap::new(); +fn flatten_kind(kind: i32) -> Map { + let mut kind_json = Map::new(); kind_json.insert("span_kind".to_string(), Value::Number(kind.into())); let description = match kind { 0 => "SPAN_KIND_UNSPECIFIED", @@ -234,12 +233,12 @@ fn flatten_kind(kind: i32) -> BTreeMap { } /// this function flattens the `Span` object -/// and returns a `Vec` of `BTreeMap` of the flattened json +/// and returns a `Vec` of `Map` of the flattened json /// this function is called recursively for each span record object in the otel traces event -fn flatten_span_record(span_record: &Span) -> Vec> { +fn flatten_span_record(span_record: &Span) -> Vec> { let mut span_records_json = Vec::new(); - let mut span_record_json = BTreeMap::new(); + let mut span_record_json = Map::new(); span_record_json.insert( "span_trace_id".to_string(), Value::String(hex::encode(&span_record.trace_id)), From c7f401f895313f093985373f7a031a013430533e Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 13 Jan 2025 23:03:42 +0530 Subject: [PATCH 03/19] refactor: get rid of clone --- src/handlers/http/modal/utils/ingest_utils.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 14705b13c..2b5e1a20f 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -27,7 +27,7 @@ use std::{collections::HashMap, sync::Arc}; use crate::{ event::{ - format::{self, EventFormat, LogSource}, + format::{json, EventFormat, LogSource}, Event, }, handlers::{ @@ -127,7 +127,7 @@ pub async fn push_logs( .schema .clone(); let (rb, is_first_event) = into_event_batch( - &value, + value, schema, static_schema_flag, time_partition.as_ref(), @@ -152,17 +152,18 @@ pub async fn push_logs( } pub fn into_event_batch( - body: &Value, + data: Value, schema: HashMap>, static_schema_flag: bool, time_partition: Option<&String>, schema_version: SchemaVersion, ) -> Result<(arrow_array::RecordBatch, bool), PostError> { - let event = format::json::Event { - data: body.to_owned(), - }; - let (rb, is_first) = - event.into_recordbatch(&schema, static_schema_flag, time_partition, schema_version)?; + let (rb, is_first) = json::Event { data }.into_recordbatch( + &schema, + static_schema_flag, + time_partition, + schema_version, + )?; Ok((rb, is_first)) } From 9ff72e78a86d1335d5ee50b6a32e13051b6efc0d Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 20 Jan 2025 22:30:20 +0530 Subject: [PATCH 04/19] refactor: use `Value` for JSON data --- src/handlers/http/ingest.rs | 56 +++++++++---------- src/handlers/http/modal/utils/ingest_utils.rs | 28 +++++----- src/otel/logs.rs | 6 +- src/otel/metrics.rs | 6 +- src/otel/traces.rs | 6 +- 5 files changed, 50 insertions(+), 52 deletions(-) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 6c4064f3e..191341b65 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -37,13 +37,13 @@ use crate::otel::traces::flatten_otel_traces; use crate::storage::{ObjectStorageError, StreamType}; use crate::utils::header_parsing::ParseHeaderError; use crate::utils::json::flatten::JsonFlattenError; +use actix_web::web::Json; use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; use arrow_array::RecordBatch; use arrow_schema::Schema; use bytes::Bytes; use chrono::Utc; use http::StatusCode; -use nom::AsBytes; use opentelemetry_proto::tonic::logs::v1::LogsData; use opentelemetry_proto::tonic::metrics::v1::MetricsData; use opentelemetry_proto::tonic::trace::v1::TracesData; @@ -54,7 +54,7 @@ use std::sync::Arc; // Handler for POST /api/v1/ingest // ingests events by extracting stream name from header // creates if stream does not exist -pub async fn ingest(req: HttpRequest, body: Bytes) -> Result { +pub async fn ingest(req: HttpRequest, Json(json): Json) -> Result { if let Some((_, stream_name)) = req .headers() .iter() @@ -75,7 +75,7 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result Result< // creates if stream does not exist pub async fn handle_otel_logs_ingestion( req: HttpRequest, - body: Bytes, + Json(json): Json, ) -> Result { let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); @@ -143,10 +143,9 @@ pub async fn handle_otel_logs_ingestion( .await?; //custom flattening required for otel logs - let logs: LogsData = serde_json::from_slice(body.as_bytes())?; + let logs: LogsData = serde_json::from_value(json)?; for record in flatten_otel_logs(&logs) { - let body: Bytes = serde_json::to_vec(&record).unwrap().into(); - push_logs(&stream_name, &body, &log_source).await?; + push_logs(&stream_name, record, &log_source).await?; } Ok(HttpResponse::Ok().finish()) @@ -157,7 +156,7 @@ pub async fn handle_otel_logs_ingestion( // creates if stream does not exist pub async fn handle_otel_metrics_ingestion( req: HttpRequest, - body: Bytes, + Json(json): Json, ) -> Result { let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); @@ -180,10 +179,9 @@ pub async fn handle_otel_metrics_ingestion( .await?; //custom flattening required for otel metrics - let metrics: MetricsData = serde_json::from_slice(body.as_bytes())?; + let metrics: MetricsData = serde_json::from_value(json)?; for record in flatten_otel_metrics(metrics) { - let body: Bytes = serde_json::to_vec(&record).unwrap().into(); - push_logs(&stream_name, &body, &log_source).await?; + push_logs(&stream_name, record, &log_source).await?; } Ok(HttpResponse::Ok().finish()) @@ -194,7 +192,7 @@ pub async fn handle_otel_metrics_ingestion( // creates if stream does not exist pub async fn handle_otel_traces_ingestion( req: HttpRequest, - body: Bytes, + Json(json): Json, ) -> Result { let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); @@ -218,10 +216,9 @@ pub async fn handle_otel_traces_ingestion( .await?; //custom flattening required for otel traces - let traces: TracesData = serde_json::from_slice(body.as_bytes())?; + let traces: TracesData = serde_json::from_value(json)?; for record in flatten_otel_traces(&traces) { - let body: Bytes = serde_json::to_vec(&record).unwrap().into(); - push_logs(&stream_name, &body, &log_source).await?; + push_logs(&stream_name, record, &log_source).await?; } Ok(HttpResponse::Ok().finish()) @@ -230,7 +227,10 @@ pub async fn handle_otel_traces_ingestion( // Handler for POST /api/v1/logstream/{logstream} // only ingests events into the specified logstream // fails if the logstream does not exist -pub async fn post_event(req: HttpRequest, body: Bytes) -> Result { +pub async fn post_event( + req: HttpRequest, + Json(json): Json, +) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); let internal_stream_names = STREAM_INFO.list_internal_streams(); if internal_stream_names.contains(&stream_name) { @@ -253,7 +253,7 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result Result<(), PostError> { let log_source = req @@ -56,10 +55,10 @@ pub async fn flatten_and_push_logs( match log_source { LogSource::Kinesis => { - let message: Message = serde_json::from_slice(&body)?; - for record in flatten_kinesis_logs(message) { - let body: Bytes = serde_json::to_vec(&record).unwrap().into(); - push_logs(stream_name, &body, &LogSource::default()).await?; + let message: Message = serde_json::from_value(json)?; + let json = flatten_kinesis_logs(message); + for record in json { + push_logs(stream_name, record, &LogSource::default()).await?; } } LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => { @@ -68,7 +67,7 @@ pub async fn flatten_and_push_logs( ))); } _ => { - push_logs(stream_name, &body, &log_source).await?; + push_logs(stream_name, json, &log_source).await?; } } Ok(()) @@ -76,7 +75,7 @@ pub async fn flatten_and_push_logs( pub async fn push_logs( stream_name: &str, - body: &Bytes, + json: Value, log_source: &LogSource, ) -> Result<(), PostError> { let time_partition = STREAM_INFO.get_time_partition(stream_name)?; @@ -84,11 +83,10 @@ pub async fn push_logs( let static_schema_flag = STREAM_INFO.get_static_schema_flag(stream_name)?; let custom_partition = STREAM_INFO.get_custom_partition(stream_name)?; let schema_version = STREAM_INFO.get_schema_version(stream_name)?; - let body_val: Value = serde_json::from_slice(body)?; let data = if time_partition.is_some() || custom_partition.is_some() { convert_array_to_object( - body_val, + json, time_partition.as_ref(), time_partition_limit, custom_partition.as_ref(), @@ -97,7 +95,7 @@ pub async fn push_logs( )? } else { vec![convert_to_array(convert_array_to_object( - body_val, + json, None, None, None, @@ -168,12 +166,12 @@ pub fn into_event_batch( } pub fn get_custom_partition_values( - body: &Value, + json: &Value, custom_partition_list: &[&str], ) -> HashMap { let mut custom_partition_values: HashMap = HashMap::new(); for custom_partition_field in custom_partition_list { - let custom_partition_value = body.get(custom_partition_field.trim()).unwrap().to_owned(); + let custom_partition_value = json.get(custom_partition_field.trim()).unwrap().to_owned(); let custom_partition_value = match custom_partition_value { e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(), Value::String(s) => s, @@ -187,8 +185,8 @@ pub fn get_custom_partition_values( custom_partition_values } -fn get_parsed_timestamp(body: &Value, time_partition: &str) -> Result { - let current_time = body.get(time_partition).ok_or_else(|| { +fn get_parsed_timestamp(json: &Value, time_partition: &str) -> Result { + let current_time = json.get(time_partition).ok_or_else(|| { anyhow!( "Missing field for time partition from json: {:?}", time_partition diff --git a/src/otel/logs.rs b/src/otel/logs.rs index 1e5b413cd..969758d5a 100644 --- a/src/otel/logs.rs +++ b/src/otel/logs.rs @@ -128,8 +128,8 @@ fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec> { } /// this function performs the custom flattening of the otel logs -/// and returns a `Vec` of `Map` of the flattened json -pub fn flatten_otel_logs(message: &LogsData) -> Vec> { +/// and returns a `Vec` of `Value::Object` of the flattened json +pub fn flatten_otel_logs(message: &LogsData) -> Vec { let mut vec_otel_json = Vec::new(); for record in &message.resource_logs { let mut resource_log_json = Map::new(); @@ -158,5 +158,5 @@ pub fn flatten_otel_logs(message: &LogsData) -> Vec> { vec_otel_json.extend(vec_resource_logs_json); } - vec_otel_json + vec_otel_json.into_iter().map(Value::Object).collect() } diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index cebddd117..aa621b03e 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -426,8 +426,8 @@ pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec } /// this function performs the custom flattening of the otel metrics -/// and returns a `Vec` of `Map` of the flattened json -pub fn flatten_otel_metrics(message: MetricsData) -> Vec> { +/// and returns a `Vec` of `Value::Object` of the flattened json +pub fn flatten_otel_metrics(message: MetricsData) -> Vec { let mut vec_otel_json = Vec::new(); for record in &message.resource_metrics { let mut resource_metrics_json = Map::new(); @@ -479,7 +479,7 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Vec> { } vec_otel_json.extend(vec_scope_metrics_json); } - vec_otel_json + vec_otel_json.into_iter().map(Value::Object).collect() } /// otel metrics event has json object for aggregation temporality diff --git a/src/otel/traces.rs b/src/otel/traces.rs index 74235593c..e1ce3406a 100644 --- a/src/otel/traces.rs +++ b/src/otel/traces.rs @@ -68,8 +68,8 @@ fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec> { } /// this function performs the custom flattening of the otel traces event -/// and returns a `Vec` of `Map` of the flattened json -pub fn flatten_otel_traces(message: &TracesData) -> Vec> { +/// and returns a `Vec` of `Value::Object` of the flattened json +pub fn flatten_otel_traces(message: &TracesData) -> Vec { let mut vec_otel_json = Vec::new(); for record in &message.resource_spans { @@ -103,7 +103,7 @@ pub fn flatten_otel_traces(message: &TracesData) -> Vec> { vec_otel_json.extend(vec_resource_spans_json); } - vec_otel_json + vec_otel_json.into_iter().map(Value::Object).collect() } /// otel traces has json array of events From c0bee1bd1163fc5f08cb8f20f53dcea7fe9cf807 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 13 Jan 2025 23:48:33 +0530 Subject: [PATCH 05/19] refactor: `HeaderMap::get` and `let Some else` --- src/handlers/http/ingest.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 191341b65..551e37287 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -55,11 +55,10 @@ use std::sync::Arc; // ingests events by extracting stream name from header // creates if stream does not exist pub async fn ingest(req: HttpRequest, Json(json): Json) -> Result { - if let Some((_, stream_name)) = req - .headers() - .iter() - .find(|&(key, _)| key == STREAM_NAME_HEADER_KEY) - { + let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { + return Err(PostError::Header(ParseHeaderError::MissingStreamName)); + }; + let stream_name = stream_name.to_str().unwrap().to_owned(); let internal_stream_names = STREAM_INFO.list_internal_streams(); if internal_stream_names.contains(&stream_name) { @@ -77,9 +76,6 @@ pub async fn ingest(req: HttpRequest, Json(json): Json) -> Result Result<(), PostError> { From 0270aa475728d58198b943bf45c8d9dfbf5f22cd Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 20 Jan 2025 22:40:06 +0530 Subject: [PATCH 06/19] refacror: ingest utils don't need http context anymore --- src/handlers/http/ingest.rs | 46 ++++++++++++------- src/handlers/http/modal/utils/ingest_utils.rs | 21 ++------- 2 files changed, 34 insertions(+), 33 deletions(-) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 551e37287..54c24119e 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -59,23 +59,29 @@ pub async fn ingest(req: HttpRequest, Json(json): Json) -> Result Result<(), PostError> { @@ -249,7 +255,13 @@ pub async fn post_event( } } - flatten_and_push_logs(req, json, &stream_name).await?; + let log_source = req + .headers() + .get(LOG_SOURCE_KEY) + .and_then(|h| h.to_str().ok()) + .map_or(LogSource::default(), LogSource::from); + flatten_and_push_logs(json, &stream_name, &log_source).await?; + Ok(HttpResponse::Ok().finish()) } diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 8d1acf08c..41e6542a8 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -16,7 +16,6 @@ * */ -use actix_web::HttpRequest; use anyhow::anyhow; use arrow_schema::Field; use chrono::{DateTime, NaiveDateTime, Utc}; @@ -29,12 +28,9 @@ use crate::{ format::{json, EventFormat, LogSource}, Event, }, - handlers::{ - http::{ - ingest::PostError, - kinesis::{flatten_kinesis_logs, Message}, - }, - LOG_SOURCE_KEY, + handlers::http::{ + ingest::PostError, + kinesis::{flatten_kinesis_logs, Message}, }, metadata::{SchemaVersion, STREAM_INFO}, storage::StreamType, @@ -42,17 +38,10 @@ use crate::{ }; pub async fn flatten_and_push_logs( - req: HttpRequest, json: Value, stream_name: &str, + log_source: &LogSource, ) -> Result<(), PostError> { - let log_source = req - .headers() - .get(LOG_SOURCE_KEY) - .map(|h| h.to_str().unwrap_or("")) - .map(LogSource::from) - .unwrap_or_default(); - match log_source { LogSource::Kinesis => { let message: Message = serde_json::from_value(json)?; @@ -67,7 +56,7 @@ pub async fn flatten_and_push_logs( ))); } _ => { - push_logs(stream_name, json, &log_source).await?; + push_logs(stream_name, json, log_source).await?; } } Ok(()) From 569d132777f398ebc1fda920abaaea7655d3738a Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 14 Jan 2025 02:12:25 +0530 Subject: [PATCH 07/19] refactor: more descriptive error variants --- src/event/format/mod.rs | 14 +++++++ src/handlers/http/ingest.rs | 39 +++++++++++-------- .../http/modal/query/querier_ingest.rs | 4 +- src/handlers/http/modal/utils/ingest_utils.rs | 16 +++----- 4 files changed, 42 insertions(+), 31 deletions(-) diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 4032c92fa..f4a44dd1f 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -19,6 +19,7 @@ use std::{ collections::{HashMap, HashSet}, + fmt::Display, sync::Arc, }; @@ -73,6 +74,19 @@ impl From<&str> for LogSource { } } +impl Display for LogSource { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(match self { + LogSource::Kinesis => "kinesis", + LogSource::OtelLogs => "otel-logs", + LogSource::OtelMetrics => "otel-metrics", + LogSource::OtelTraces => "otel-traces", + LogSource::Json => "json", + LogSource::Custom(custom) => custom, + }) + } +} + // Global Trait for event format // This trait is implemented by all the event formats pub trait EventFormat: Sized { diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 54c24119e..41182bfd3 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -62,10 +62,7 @@ pub async fn ingest(req: HttpRequest, Json(json): Json) -> Result StatusCode::INTERNAL_SERVER_ERROR, PostError::StreamError(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::JsonFlattenError(_) => StatusCode::INTERNAL_SERVER_ERROR, + PostError::OtelNotSupported => StatusCode::BAD_REQUEST, + PostError::InternalStream(_) => StatusCode::BAD_REQUEST, + PostError::IncorrectLogSource(_) => StatusCode::BAD_REQUEST, + PostError::IngestionNotAllowed => StatusCode::BAD_REQUEST, + PostError::MissingTimePartition(_) => StatusCode::BAD_REQUEST, } } diff --git a/src/handlers/http/modal/query/querier_ingest.rs b/src/handlers/http/modal/query/querier_ingest.rs index 1eff3999a..ae66ea8b1 100644 --- a/src/handlers/http/modal/query/querier_ingest.rs +++ b/src/handlers/http/modal/query/querier_ingest.rs @@ -25,7 +25,5 @@ use bytes::Bytes; // fails if the logstream does not exist #[allow(unused)] pub async fn post_event(req: HttpRequest, body: Bytes) -> Result { - Err(PostError::Invalid(anyhow::anyhow!( - "Ingestion is not allowed in Query mode" - ))) + Err(PostError::IngestionNotAllowed) } diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 41e6542a8..3a2b9c797 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -16,7 +16,6 @@ * */ -use anyhow::anyhow; use arrow_schema::Field; use chrono::{DateTime, NaiveDateTime, Utc}; use itertools::Itertools; @@ -51,9 +50,7 @@ pub async fn flatten_and_push_logs( } } LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => { - return Err(PostError::Invalid(anyhow!( - "Please use endpoints `/v1/logs` for otel logs, `/v1/metrics` for otel metrics and `/v1/traces` for otel traces" - ))); + return Err(PostError::OtelNotSupported); } _ => { push_logs(stream_name, json, log_source).await?; @@ -175,12 +172,9 @@ pub fn get_custom_partition_values( } fn get_parsed_timestamp(json: &Value, time_partition: &str) -> Result { - let current_time = json.get(time_partition).ok_or_else(|| { - anyhow!( - "Missing field for time partition from json: {:?}", - time_partition - ) - })?; + let current_time = json + .get(time_partition) + .ok_or_else(|| PostError::MissingTimePartition(time_partition.to_string()))?; let parsed_time: DateTime = serde_json::from_value(current_time.clone())?; Ok(parsed_time.naive_utc()) @@ -208,7 +202,7 @@ mod tests { let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); let parsed = get_parsed_timestamp(&json, "timestamp"); - matches!(parsed, Err(PostError::Invalid(_))); + matches!(parsed, Err(PostError::MissingTimePartition(_))); } #[test] From d0682cfca741d72a5b975f3c8aff55bfcf9427a8 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 14 Jan 2025 03:30:25 +0530 Subject: [PATCH 08/19] refactor: PUT stream header extraction --- src/event/format/mod.rs | 1 + src/handlers/http/logstream.rs | 14 +-- .../http/modal/ingest/ingestor_logstream.rs | 2 +- .../http/modal/query/querier_logstream.rs | 2 +- .../http/modal/utils/logstream_utils.rs | 100 +++++++++--------- 5 files changed, 59 insertions(+), 60 deletions(-) diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index f4a44dd1f..4dc89a5eb 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -82,6 +82,7 @@ impl Display for LogSource { LogSource::OtelMetrics => "otel-metrics", LogSource::OtelTraces => "otel-traces", LogSource::Json => "json", + LogSource::Pmeta => "pmeta", LogSource::Custom(custom) => custom, }) } diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 7eac4e822..114882af0 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -193,7 +193,7 @@ pub async fn get_alert(req: HttpRequest) -> Result pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - create_update_stream(&req, &body, &stream_name).await?; + create_update_stream(req.headers(), &body, &stream_name).await?; Ok(("Log stream created", StatusCode::OK)) } @@ -903,7 +903,7 @@ pub mod error { mod tests { use crate::handlers::http::logstream::error::StreamError; use crate::handlers::http::logstream::get_stats; - use crate::handlers::http::modal::utils::logstream_utils::fetch_headers_from_put_stream_request; + use crate::handlers::http::modal::utils::logstream_utils::PutStreamHeaders; use actix_web::test::TestRequest; use anyhow::bail; #[actix_web::test] @@ -928,7 +928,7 @@ mod tests { #[actix_web::test] async fn header_without_log_source() { let req = TestRequest::default().to_http_request(); - let (_, _, _, _, _, _, log_source) = fetch_headers_from_put_stream_request(&req); + let PutStreamHeaders { log_source, .. } = req.headers().into(); assert_eq!(log_source, crate::event::format::LogSource::Json); } @@ -937,19 +937,19 @@ mod tests { let mut req = TestRequest::default() .insert_header(("X-P-Log-Source", "pmeta")) .to_http_request(); - let (_, _, _, _, _, _, log_source) = fetch_headers_from_put_stream_request(&req); + let PutStreamHeaders { log_source, .. } = req.headers().into(); assert_eq!(log_source, crate::event::format::LogSource::Pmeta); req = TestRequest::default() .insert_header(("X-P-Log-Source", "otel-logs")) .to_http_request(); - let (_, _, _, _, _, _, log_source) = fetch_headers_from_put_stream_request(&req); + let PutStreamHeaders { log_source, .. } = req.headers().into(); assert_eq!(log_source, crate::event::format::LogSource::OtelLogs); req = TestRequest::default() .insert_header(("X-P-Log-Source", "kinesis")) .to_http_request(); - let (_, _, _, _, _, _, log_source) = fetch_headers_from_put_stream_request(&req); + let PutStreamHeaders { log_source, .. } = req.headers().into(); assert_eq!(log_source, crate::event::format::LogSource::Kinesis); } @@ -958,7 +958,7 @@ mod tests { let req = TestRequest::default() .insert_header(("X-P-Log-Source", "teststream")) .to_http_request(); - let (_, _, _, _, _, _, log_source) = fetch_headers_from_put_stream_request(&req); + let PutStreamHeaders { log_source, .. } = req.headers().into(); assert_eq!(log_source, crate::event::format::LogSource::Json); } } diff --git a/src/handlers/http/modal/ingest/ingestor_logstream.rs b/src/handlers/http/modal/ingest/ingestor_logstream.rs index 3f0e5292d..b61e2db54 100644 --- a/src/handlers/http/modal/ingest/ingestor_logstream.rs +++ b/src/handlers/http/modal/ingest/ingestor_logstream.rs @@ -83,7 +83,7 @@ pub async fn delete(req: HttpRequest) -> Result { pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - create_update_stream(&req, &body, &stream_name).await?; + create_update_stream(req.headers(), &body, &stream_name).await?; Ok(("Log stream created", StatusCode::OK)) } diff --git a/src/handlers/http/modal/query/querier_logstream.rs b/src/handlers/http/modal/query/querier_logstream.rs index 58277f7b8..baf0cde47 100644 --- a/src/handlers/http/modal/query/querier_logstream.rs +++ b/src/handlers/http/modal/query/querier_logstream.rs @@ -110,7 +110,7 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result Result { - let ( + let PutStreamHeaders { time_partition, time_partition_limit, custom_partition, @@ -50,7 +50,7 @@ pub async fn create_update_stream( update_stream_flag, stream_type, log_source, - ) = fetch_headers_from_put_stream_request(req); + } = headers.into(); if metadata::STREAM_INFO.stream_exists(stream_name) && !update_stream_flag { return Err(StreamError::Custom { @@ -75,7 +75,7 @@ pub async fn create_update_stream( if update_stream_flag { return update_stream( - req, + headers, stream_name, &time_partition, static_schema_flag, @@ -119,11 +119,11 @@ pub async fn create_update_stream( ) .await?; - Ok(req.headers().clone()) + Ok(headers.clone()) } async fn update_stream( - req: &HttpRequest, + headers: &HeaderMap, stream_name: &str, time_partition: &str, static_schema_flag: bool, @@ -148,11 +148,11 @@ async fn update_stream( if !time_partition_limit.is_empty() { let time_partition_days = validate_time_partition_limit(time_partition_limit)?; update_time_partition_limit_in_stream(stream_name.to_string(), time_partition_days).await?; - return Ok(req.headers().clone()); + return Ok(headers.clone()); } validate_and_update_custom_partition(stream_name, custom_partition).await?; - Ok(req.headers().clone()) + Ok(headers.clone()) } async fn validate_and_update_custom_partition( @@ -168,49 +168,47 @@ async fn validate_and_update_custom_partition( Ok(()) } -pub fn fetch_headers_from_put_stream_request( - req: &HttpRequest, -) -> (String, String, String, bool, bool, String, LogSource) { - let mut time_partition = String::default(); - let mut time_partition_limit = String::default(); - let mut custom_partition = String::default(); - let mut static_schema_flag = false; - let mut update_stream_flag = false; - let mut stream_type = StreamType::UserDefined.to_string(); - let mut log_source = LogSource::default(); - req.headers().iter().for_each(|(key, value)| { - if key == TIME_PARTITION_KEY { - time_partition = value.to_str().unwrap().to_string(); - } - if key == TIME_PARTITION_LIMIT_KEY { - time_partition_limit = value.to_str().unwrap().to_string(); - } - if key == CUSTOM_PARTITION_KEY { - custom_partition = value.to_str().unwrap().to_string(); - } - if key == STATIC_SCHEMA_FLAG && value.to_str().unwrap() == "true" { - static_schema_flag = true; - } - if key == UPDATE_STREAM_KEY && value.to_str().unwrap() == "true" { - update_stream_flag = true; - } - if key == STREAM_TYPE_KEY { - stream_type = value.to_str().unwrap().to_string(); - } - if key == LOG_SOURCE_KEY { - log_source = LogSource::from(value.to_str().unwrap()); - } - }); +#[derive(Debug, Default)] +pub struct PutStreamHeaders { + pub time_partition: String, + pub time_partition_limit: String, + pub custom_partition: String, + pub static_schema_flag: bool, + pub update_stream_flag: bool, + pub stream_type: String, + pub log_source: LogSource, +} - ( - time_partition, - time_partition_limit, - custom_partition, - static_schema_flag, - update_stream_flag, - stream_type, - log_source, - ) +impl From<&HeaderMap> for PutStreamHeaders { + fn from(headers: &HeaderMap) -> Self { + PutStreamHeaders { + time_partition: headers + .get(TIME_PARTITION_KEY) + .map_or("", |v| v.to_str().unwrap()) + .to_string(), + time_partition_limit: headers + .get(TIME_PARTITION_LIMIT_KEY) + .map_or("", |v| v.to_str().unwrap()) + .to_string(), + custom_partition: headers + .get(CUSTOM_PARTITION_KEY) + .map_or("", |v| v.to_str().unwrap()) + .to_string(), + static_schema_flag: headers + .get(STATIC_SCHEMA_FLAG) + .is_some_and(|v| v.to_str().unwrap() == "true"), + update_stream_flag: headers + .get(UPDATE_STREAM_KEY) + .is_some_and(|v| v.to_str().unwrap() == "true"), + stream_type: headers + .get(STREAM_TYPE_KEY) + .map_or("", |v| v.to_str().unwrap()) + .to_string(), + log_source: headers + .get(LOG_SOURCE_KEY) + .map_or(LogSource::default(), |v| v.to_str().unwrap().into()), + } + } } pub fn validate_time_partition_limit( From 7af329d60aeb5a084d3feae79d1314f877bb33b0 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 20 Jan 2025 22:49:41 +0530 Subject: [PATCH 09/19] refactor: use Path and Json extractor --- src/handlers/http/cluster/mod.rs | 8 +- src/handlers/http/ingest.rs | 5 +- src/handlers/http/logstream.rs | 108 +++++++++--------- .../http/modal/ingest/ingestor_logstream.rs | 19 +-- .../http/modal/query/querier_logstream.rs | 26 +++-- src/handlers/http/users/dashboards.rs | 41 +++---- src/handlers/http/users/filters.rs | 47 ++++---- 7 files changed, 139 insertions(+), 115 deletions(-) diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 4e936c79d..b32ba2787 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -35,7 +35,8 @@ use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY}; use crate::storage::{ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY}; use crate::HTTP_CLIENT; use actix_web::http::header::{self, HeaderMap}; -use actix_web::{HttpRequest, Responder}; +use actix_web::web::Path; +use actix_web::Responder; use bytes::Bytes; use chrono::Utc; use http::{header as http_header, StatusCode}; @@ -676,9 +677,8 @@ pub async fn get_ingestor_info() -> anyhow::Result { Ok(arr) } -pub async fn remove_ingestor(req: HttpRequest) -> Result { - let domain_name: String = req.match_info().get("ingestor").unwrap().parse().unwrap(); - let domain_name = to_url_string(domain_name); +pub async fn remove_ingestor(ingestor: Path) -> Result { + let domain_name = to_url_string(ingestor.into_inner()); if check_liveness(&domain_name).await { return Err(PostError::Invalid(anyhow::anyhow!( diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 41182bfd3..8d70a348d 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -37,7 +37,7 @@ use crate::otel::traces::flatten_otel_traces; use crate::storage::{ObjectStorageError, StreamType}; use crate::utils::header_parsing::ParseHeaderError; use crate::utils::json::flatten::JsonFlattenError; -use actix_web::web::Json; +use actix_web::web::{Json, Path}; use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; use arrow_array::RecordBatch; use arrow_schema::Schema; @@ -222,9 +222,10 @@ pub async fn handle_otel_traces_ingestion( // fails if the logstream does not exist pub async fn post_event( req: HttpRequest, + stream_name: Path, Json(json): Json, ) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + let stream_name = stream_name.into_inner(); let internal_stream_names = STREAM_INFO.list_internal_streams(); if internal_stream_names.contains(&stream_name) { return Err(PostError::InternalStream(stream_name)); diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 114882af0..5d95bd2de 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -43,6 +43,7 @@ use crate::{event, stats}; use crate::{metadata, validator}; use actix_web::http::header::{self, HeaderMap}; use actix_web::http::StatusCode; +use actix_web::web::{Json, Path}; use actix_web::{web, HttpRequest, Responder}; use arrow_json::reader::infer_json_schema_from_iterator; use arrow_schema::{Field, Schema}; @@ -58,8 +59,8 @@ use std::str::FromStr; use std::sync::Arc; use tracing::{error, warn}; -pub async fn delete(req: HttpRequest) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); +pub async fn delete(stream_name: Path) -> Result { + let stream_name = stream_name.into_inner(); if !metadata::STREAM_INFO.stream_exists(&stream_name) { return Err(StreamError::StreamNotFound(stream_name)); } @@ -113,9 +114,8 @@ pub async fn list(req: HttpRequest) -> Result { Ok(web::Json(res)) } -pub async fn detect_schema(body: Bytes) -> Result { - let body_val: Value = serde_json::from_slice(&body)?; - let log_records: Vec = match body_val { +pub async fn detect_schema(Json(json): Json) -> Result { + let log_records: Vec = match json { Value::Array(arr) => arr, value @ Value::Object(_) => vec![value], _ => { @@ -133,8 +133,8 @@ pub async fn detect_schema(body: Bytes) -> Result { Ok((web::Json(schema), StatusCode::OK)) } -pub async fn schema(req: HttpRequest) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); +pub async fn schema(stream_name: Path) -> Result { + let stream_name = stream_name.into_inner(); match STREAM_INFO.schema(&stream_name) { Ok(_) => {} @@ -157,8 +157,8 @@ pub async fn schema(req: HttpRequest) -> Result { } } -pub async fn get_alert(req: HttpRequest) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); +pub async fn get_alert(stream_name: Path) -> Result { + let stream_name = stream_name.into_inner(); let alerts = metadata::STREAM_INFO .read() @@ -190,8 +190,12 @@ pub async fn get_alert(req: HttpRequest) -> Result Ok((web::Json(alerts), StatusCode::OK)) } -pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); +pub async fn put_stream( + req: HttpRequest, + stream_name: Path, + body: Bytes, +) -> Result { + let stream_name = stream_name.into_inner(); create_update_stream(req.headers(), &body, &stream_name).await?; @@ -199,15 +203,13 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result, + stream_name: Path, + Json(mut json): Json, ) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + let stream_name = stream_name.into_inner(); - let mut body = body.into_inner(); - remove_id_from_alerts(&mut body); - - let alerts: Alerts = match serde_json::from_value(body) { + remove_id_from_alerts(&mut json); + let alerts: Alerts = match serde_json::from_value(json) { Ok(alerts) => alerts, Err(err) => { return Err(StreamError::BadAlertJson { @@ -265,8 +267,8 @@ pub async fn put_alert( )) } -pub async fn get_retention(req: HttpRequest) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); +pub async fn get_retention(stream_name: Path) -> Result { + let stream_name = stream_name.into_inner(); if !STREAM_INFO.stream_exists(&stream_name) { // For query mode, if the stream not found in memory map, //check if it exists in the storage @@ -295,10 +297,10 @@ pub async fn get_retention(req: HttpRequest) -> Result, + stream_name: Path, + Json(json): Json, ) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + let stream_name = stream_name.into_inner(); if !STREAM_INFO.stream_exists(&stream_name) { // For query mode, if the stream not found in memory map, @@ -314,9 +316,7 @@ pub async fn put_retention( } } - let body = body.into_inner(); - - let retention: Retention = match serde_json::from_value(body) { + let retention: Retention = match serde_json::from_value(json) { Ok(retention) => retention, Err(err) => return Err(StreamError::InvalidRetentionConfig(err)), }; @@ -361,8 +361,11 @@ pub async fn get_stats_date(stream_name: &str, date: &str) -> Result Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); +pub async fn get_stats( + req: HttpRequest, + stream_name: Path, +) -> Result { + let stream_name = stream_name.into_inner(); if !STREAM_INFO.stream_exists(&stream_name) { // For query mode, if the stream not found in memory map, @@ -380,8 +383,9 @@ pub async fn get_stats(req: HttpRequest) -> Result let query_string = req.query_string(); if !query_string.is_empty() { - let date_key = query_string.split('=').collect::>()[0]; - let date_value = query_string.split('=').collect::>()[1]; + let tokens = query_string.split('=').collect::>(); + let date_key = tokens[0]; + let date_value = tokens[1]; if date_key != "date" { return Err(StreamError::Custom { msg: "Invalid query parameter".to_string(), @@ -546,8 +550,8 @@ pub async fn create_stream( Ok(()) } -pub async fn get_stream_info(req: HttpRequest) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); +pub async fn get_stream_info(stream_name: Path) -> Result { + let stream_name = stream_name.into_inner(); if !STREAM_INFO.stream_exists(&stream_name) { if CONFIG.options.mode == Mode::Query { match create_stream_and_schema_from_storage(&stream_name).await { @@ -596,10 +600,10 @@ pub async fn get_stream_info(req: HttpRequest) -> Result, + stream_name: Path, + Json(json): Json, ) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + let stream_name = stream_name.into_inner(); if !STREAM_INFO.stream_exists(&stream_name) { // For query mode, if the stream not found in memory map, //check if it exists in the storage @@ -624,8 +628,7 @@ pub async fn put_stream_hot_tier( return Err(StreamError::HotTierNotEnabled(stream_name)); } - let body = body.into_inner(); - let mut hottier: StreamHotTier = match serde_json::from_value(body) { + let mut hottier: StreamHotTier = match serde_json::from_value(json) { Ok(hottier) => hottier, Err(err) => return Err(StreamError::InvalidHotTierConfig(err)), }; @@ -657,8 +660,8 @@ pub async fn put_stream_hot_tier( )) } -pub async fn get_stream_hot_tier(req: HttpRequest) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); +pub async fn get_stream_hot_tier(stream_name: Path) -> Result { + let stream_name = stream_name.into_inner(); if !STREAM_INFO.stream_exists(&stream_name) { // For query mode, if the stream not found in memory map, @@ -692,8 +695,10 @@ pub async fn get_stream_hot_tier(req: HttpRequest) -> Result Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); +pub async fn delete_stream_hot_tier( + stream_name: Path, +) -> Result { + let stream_name = stream_name.into_inner(); if !STREAM_INFO.stream_exists(&stream_name) { // For query mode, if the stream not found in memory map, @@ -905,21 +910,22 @@ mod tests { use crate::handlers::http::logstream::get_stats; use crate::handlers::http::modal::utils::logstream_utils::PutStreamHeaders; use actix_web::test::TestRequest; + use actix_web::web; use anyhow::bail; - #[actix_web::test] - #[should_panic] - async fn get_stats_panics_without_logstream() { - let req = TestRequest::default().to_http_request(); - let _ = get_stats(req).await; - } + + // TODO: Fix this test with routes + // #[actix_web::test] + // #[should_panic] + // async fn get_stats_panics_without_logstream() { + // let req = TestRequest::default().to_http_request(); + // let _ = get_stats(req).await; + // } #[actix_web::test] async fn get_stats_stream_not_found_error_for_unknown_logstream() -> anyhow::Result<()> { - let req = TestRequest::default() - .param("logstream", "test") - .to_http_request(); + let req = TestRequest::default().to_http_request(); - match get_stats(req).await { + match get_stats(req, web::Path::from("test".to_string())).await { Err(StreamError::StreamNotFound(_)) => Ok(()), _ => bail!("expected StreamNotFound error"), } diff --git a/src/handlers/http/modal/ingest/ingestor_logstream.rs b/src/handlers/http/modal/ingest/ingestor_logstream.rs index b61e2db54..6b0f203eb 100644 --- a/src/handlers/http/modal/ingest/ingestor_logstream.rs +++ b/src/handlers/http/modal/ingest/ingestor_logstream.rs @@ -16,7 +16,7 @@ * */ -use actix_web::{HttpRequest, Responder}; +use actix_web::{web::Path, HttpRequest, Responder}; use bytes::Bytes; use http::StatusCode; use tracing::warn; @@ -36,10 +36,10 @@ use crate::{ }; pub async fn retention_cleanup( - req: HttpRequest, + stream_name: Path, body: Bytes, ) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + let stream_name = stream_name.into_inner(); let storage = CONFIG.storage().get_object_store(); // if the stream not found in memory map, //check if it exists in the storage @@ -59,8 +59,8 @@ pub async fn retention_cleanup( Ok((first_event_at, StatusCode::OK)) } -pub async fn delete(req: HttpRequest) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); +pub async fn delete(stream_name: Path) -> Result { + let stream_name = stream_name.into_inner(); // if the stream not found in memory map, //check if it exists in the storage //create stream and schema from storage @@ -80,9 +80,12 @@ pub async fn delete(req: HttpRequest) -> Result { Ok((format!("log stream {stream_name} deleted"), StatusCode::OK)) } -pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - +pub async fn put_stream( + req: HttpRequest, + stream_name: Path, + body: Bytes, +) -> Result { + let stream_name = stream_name.into_inner(); create_update_stream(req.headers(), &body, &stream_name).await?; Ok(("Log stream created", StatusCode::OK)) diff --git a/src/handlers/http/modal/query/querier_logstream.rs b/src/handlers/http/modal/query/querier_logstream.rs index baf0cde47..622383964 100644 --- a/src/handlers/http/modal/query/querier_logstream.rs +++ b/src/handlers/http/modal/query/querier_logstream.rs @@ -19,7 +19,10 @@ use core::str; use std::fs; -use actix_web::{web, HttpRequest, Responder}; +use actix_web::{ + web::{self, Path}, + HttpRequest, Responder, +}; use bytes::Bytes; use chrono::Utc; use http::StatusCode; @@ -49,8 +52,8 @@ use crate::{ storage::{StorageDir, StreamType}, }; -pub async fn delete(req: HttpRequest) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); +pub async fn delete(stream_name: Path) -> Result { + let stream_name = stream_name.into_inner(); // if the stream not found in memory map, //check if it exists in the storage @@ -106,9 +109,12 @@ pub async fn delete(req: HttpRequest) -> Result { Ok((format!("log stream {stream_name} deleted"), StatusCode::OK)) } -pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - +pub async fn put_stream( + req: HttpRequest, + stream_name: Path, + body: Bytes, +) -> Result { + let stream_name = stream_name.into_inner(); let _ = CREATE_STREAM_LOCK.lock().await; let headers = create_update_stream(req.headers(), &body, &stream_name).await?; @@ -117,9 +123,11 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - +pub async fn get_stats( + req: HttpRequest, + stream_name: Path, +) -> Result { + let stream_name = stream_name.into_inner(); // if the stream not found in memory map, //check if it exists in the storage //create stream and schema from storage diff --git a/src/handlers/http/users/dashboards.rs b/src/handlers/http/users/dashboards.rs index 354689834..4e54f1232 100644 --- a/src/handlers/http/users/dashboards.rs +++ b/src/handlers/http/users/dashboards.rs @@ -23,7 +23,7 @@ use crate::{ users::dashboards::{Dashboard, CURRENT_DASHBOARD_VERSION, DASHBOARDS}, utils::{get_hash, get_user_from_request}, }; -use actix_web::{http::header::ContentType, web, HttpRequest, HttpResponse, Responder}; +use actix_web::{http::header::ContentType, web, web::Path, HttpRequest, HttpResponse, Responder}; use bytes::Bytes; use rand::distributions::DistString; @@ -38,14 +38,14 @@ pub async fn list(req: HttpRequest) -> Result { Ok((web::Json(dashboards), StatusCode::OK)) } -pub async fn get(req: HttpRequest) -> Result { +pub async fn get( + req: HttpRequest, + dashboard_id: Path, +) -> Result { let user_id = get_user_from_request(&req)?; - let dashboard_id = req - .match_info() - .get("dashboard_id") - .ok_or(DashboardError::Metadata("No Dashboard Id Provided"))?; + let dashboard_id = dashboard_id.into_inner(); - if let Some(dashboard) = DASHBOARDS.get_dashboard(dashboard_id, &get_hash(&user_id)) { + if let Some(dashboard) = DASHBOARDS.get_dashboard(&dashboard_id, &get_hash(&user_id)) { return Ok((web::Json(dashboard), StatusCode::OK)); } @@ -84,15 +84,16 @@ pub async fn post(req: HttpRequest, body: Bytes) -> Result Result { +pub async fn update( + req: HttpRequest, + dashboard_id: Path, + body: Bytes, +) -> Result { let mut user_id = get_user_from_request(&req)?; user_id = get_hash(&user_id); - let dashboard_id = req - .match_info() - .get("dashboard_id") - .ok_or(DashboardError::Metadata("No Dashboard Id Provided"))?; + let dashboard_id = dashboard_id.into_inner(); - if DASHBOARDS.get_dashboard(dashboard_id, &user_id).is_none() { + if DASHBOARDS.get_dashboard(&dashboard_id, &user_id).is_none() { return Err(DashboardError::Metadata("Dashboard does not exist")); } let mut dashboard: Dashboard = serde_json::from_slice(&body)?; @@ -117,21 +118,21 @@ pub async fn update(req: HttpRequest, body: Bytes) -> Result Result { +pub async fn delete( + req: HttpRequest, + dashboard_id: Path, +) -> Result { let mut user_id = get_user_from_request(&req)?; user_id = get_hash(&user_id); - let dashboard_id = req - .match_info() - .get("dashboard_id") - .ok_or(DashboardError::Metadata("No Dashboard Id Provided"))?; - if DASHBOARDS.get_dashboard(dashboard_id, &user_id).is_none() { + let dashboard_id = dashboard_id.into_inner(); + if DASHBOARDS.get_dashboard(&dashboard_id, &user_id).is_none() { return Err(DashboardError::Metadata("Dashboard does not exist")); } let path = dashboard_path(&user_id, &format!("{}.json", dashboard_id)); let store = CONFIG.storage().get_object_store(); store.delete_object(&path).await?; - DASHBOARDS.delete_dashboard(dashboard_id); + DASHBOARDS.delete_dashboard(&dashboard_id); Ok(HttpResponse::Ok().finish()) } diff --git a/src/handlers/http/users/filters.rs b/src/handlers/http/users/filters.rs index e8f00c901..7d1029a46 100644 --- a/src/handlers/http/users/filters.rs +++ b/src/handlers/http/users/filters.rs @@ -23,7 +23,11 @@ use crate::{ users::filters::{Filter, CURRENT_FILTER_VERSION, FILTERS}, utils::{get_hash, get_user_from_request}, }; -use actix_web::{http::header::ContentType, web, HttpRequest, HttpResponse, Responder}; +use actix_web::{ + http::header::ContentType, + web::{self, Path}, + HttpRequest, HttpResponse, Responder, +}; use bytes::Bytes; use chrono::Utc; use http::StatusCode; @@ -35,14 +39,14 @@ pub async fn list(req: HttpRequest) -> Result { Ok((web::Json(filters), StatusCode::OK)) } -pub async fn get(req: HttpRequest) -> Result { +pub async fn get( + req: HttpRequest, + filter_id: Path, +) -> Result { let user_id = get_user_from_request(&req)?; - let filter_id = req - .match_info() - .get("filter_id") - .ok_or(FiltersError::Metadata("No Filter Id Provided"))?; + let filter_id = filter_id.into_inner(); - if let Some(filter) = FILTERS.get_filter(filter_id, &get_hash(&user_id)) { + if let Some(filter) = FILTERS.get_filter(&filter_id, &get_hash(&user_id)) { return Ok((web::Json(filter), StatusCode::OK)); } @@ -72,18 +76,19 @@ pub async fn post(req: HttpRequest, body: Bytes) -> Result Result { +pub async fn update( + req: HttpRequest, + filter_id: Path, + body: Bytes, +) -> Result { let mut user_id = get_user_from_request(&req)?; user_id = get_hash(&user_id); - let filter_id = req - .match_info() - .get("filter_id") - .ok_or(FiltersError::Metadata("No Filter Id Provided"))?; - if FILTERS.get_filter(filter_id, &user_id).is_none() { + let filter_id = filter_id.into_inner(); + if FILTERS.get_filter(&filter_id, &user_id).is_none() { return Err(FiltersError::Metadata("Filter does not exist")); } let mut filter: Filter = serde_json::from_slice(&body)?; - filter.filter_id = Some(filter_id.to_string()); + filter.filter_id = Some(filter_id.clone()); filter.user_id = Some(user_id.clone()); filter.version = Some(CURRENT_FILTER_VERSION.to_string()); FILTERS.update(&filter); @@ -101,15 +106,15 @@ pub async fn update(req: HttpRequest, body: Bytes) -> Result Result { +pub async fn delete( + req: HttpRequest, + filter_id: Path, +) -> Result { let mut user_id = get_user_from_request(&req)?; user_id = get_hash(&user_id); - let filter_id = req - .match_info() - .get("filter_id") - .ok_or(FiltersError::Metadata("No Filter Id Provided"))?; + let filter_id = filter_id.into_inner(); let filter = FILTERS - .get_filter(filter_id, &user_id) + .get_filter(&filter_id, &user_id) .ok_or(FiltersError::Metadata("Filter does not exist"))?; let path = filter_path( @@ -120,7 +125,7 @@ pub async fn delete(req: HttpRequest) -> Result { let store = CONFIG.storage().get_object_store(); store.delete_object(&path).await?; - FILTERS.delete_filter(filter_id); + FILTERS.delete_filter(&filter_id); Ok(HttpResponse::Ok().finish()) } From 1eaf769bbf1a2b6910c30995b447c8095c6feec7 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 15 Jan 2025 13:55:40 +0530 Subject: [PATCH 10/19] don't extract where not required --- src/handlers/http/modal/query/querier_ingest.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/handlers/http/modal/query/querier_ingest.rs b/src/handlers/http/modal/query/querier_ingest.rs index ae66ea8b1..6b74edafb 100644 --- a/src/handlers/http/modal/query/querier_ingest.rs +++ b/src/handlers/http/modal/query/querier_ingest.rs @@ -16,14 +16,14 @@ * */ +use actix_web::HttpResponse; + use crate::handlers::http::ingest::PostError; -use actix_web::{HttpRequest, HttpResponse}; -use bytes::Bytes; // Handler for POST /api/v1/logstream/{logstream} // only ingests events into the specified logstream // fails if the logstream does not exist #[allow(unused)] -pub async fn post_event(req: HttpRequest, body: Bytes) -> Result { +pub async fn post_event() -> Result { Err(PostError::IngestionNotAllowed) } From 0b2639478a38250a91730f1bf2fad905c02ad619 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 15 Jan 2025 13:50:15 +0530 Subject: [PATCH 11/19] refactor: serde `date_list` --- src/catalog/mod.rs | 5 +---- src/handlers/http/cluster/mod.rs | 4 ++-- src/handlers/http/modal/ingest/ingestor_logstream.rs | 8 +++++--- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/src/catalog/mod.rs b/src/catalog/mod.rs index 0a07855f3..aa46afb4f 100644 --- a/src/catalog/mod.rs +++ b/src/catalog/mod.rs @@ -33,7 +33,6 @@ use crate::{ query::PartialTimeFilter, storage::{object_storage::manifest_path, ObjectStorage, ObjectStorageError}, }; -use bytes::Bytes; use chrono::{DateTime, Local, NaiveTime, Utc}; use relative_path::RelativePathBuf; use std::io::Error as IOError; @@ -412,13 +411,11 @@ pub async fn get_first_event( base_path_without_preceding_slash(), stream_name ); - // Convert dates vector to Bytes object - let dates_bytes = Bytes::from(serde_json::to_vec(&dates).unwrap()); let ingestor_first_event_at = handlers::http::cluster::send_retention_cleanup_request( &url, ingestor.clone(), - dates_bytes, + &dates, ) .await?; if !ingestor_first_event_at.is_empty() { diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index b32ba2787..285b187b9 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -539,7 +539,7 @@ pub async fn send_stream_delete_request( pub async fn send_retention_cleanup_request( url: &str, ingestor: IngestorMetadata, - body: Bytes, + dates: &Vec, ) -> Result { let mut first_event_at: String = String::default(); if !utils::check_liveness(&ingestor.domain_name).await { @@ -549,7 +549,7 @@ pub async fn send_retention_cleanup_request( .post(url) .header(header::CONTENT_TYPE, "application/json") .header(header::AUTHORIZATION, ingestor.token) - .body(body) + .json(dates) .send() .await .map_err(|err| { diff --git a/src/handlers/http/modal/ingest/ingestor_logstream.rs b/src/handlers/http/modal/ingest/ingestor_logstream.rs index 6b0f203eb..40efce1a4 100644 --- a/src/handlers/http/modal/ingest/ingestor_logstream.rs +++ b/src/handlers/http/modal/ingest/ingestor_logstream.rs @@ -16,7 +16,10 @@ * */ -use actix_web::{web::Path, HttpRequest, Responder}; +use actix_web::{ + web::{Json, Path}, + HttpRequest, Responder, +}; use bytes::Bytes; use http::StatusCode; use tracing::warn; @@ -37,7 +40,7 @@ use crate::{ pub async fn retention_cleanup( stream_name: Path, - body: Bytes, + Json(date_list): Json>, ) -> Result { let stream_name = stream_name.into_inner(); let storage = CONFIG.storage().get_object_store(); @@ -52,7 +55,6 @@ pub async fn retention_cleanup( return Err(StreamError::StreamNotFound(stream_name.clone())); } - let date_list: Vec = serde_json::from_slice(&body).unwrap(); let res = remove_manifest_from_snapshot(storage.clone(), &stream_name, date_list).await; let first_event_at: Option = res.unwrap_or_default(); From d27a763b2b63727bcae1aa826f066620ec48e7c0 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 20 Jan 2025 23:05:11 +0530 Subject: [PATCH 12/19] refactor: serde `DefaultPrivilege` --- src/handlers/http/cluster/mod.rs | 10 ++-------- src/handlers/http/modal/ingest/ingestor_role.rs | 14 +++++++++----- src/handlers/http/modal/query/querier_role.rs | 12 ++++++++---- src/handlers/http/role.rs | 13 +++++++++---- 4 files changed, 28 insertions(+), 21 deletions(-) diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 285b187b9..91bdf288c 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -322,19 +322,13 @@ pub async fn sync_password_reset_with_ingestors(username: &String) -> Result<(), // forward the put role request to all ingestors to keep them in sync pub async fn sync_role_update_with_ingestors( name: String, - body: Vec, + privileges: Vec, ) -> Result<(), RoleError> { let ingestor_infos = get_ingestor_info().await.map_err(|err| { error!("Fatal: failed to get ingestor info: {:?}", err); RoleError::Anyhow(err) })?; - let roles = to_vec(&body).map_err(|err| { - error!("Fatal: failed to serialize roles: {:?}", err); - RoleError::SerdeError(err) - })?; - let roles = Bytes::from(roles); - for ingestor in ingestor_infos.iter() { if !utils::check_liveness(&ingestor.domain_name).await { warn!("Ingestor {} is not live", ingestor.domain_name); @@ -351,7 +345,7 @@ pub async fn sync_role_update_with_ingestors( .put(url) .header(header::AUTHORIZATION, &ingestor.token) .header(header::CONTENT_TYPE, "application/json") - .body(roles.clone()) + .json(&privileges) .send() .await .map_err(|err| { diff --git a/src/handlers/http/modal/ingest/ingestor_role.rs b/src/handlers/http/modal/ingest/ingestor_role.rs index 499157136..d48b9efdf 100644 --- a/src/handlers/http/modal/ingest/ingestor_role.rs +++ b/src/handlers/http/modal/ingest/ingestor_role.rs @@ -16,8 +16,10 @@ * */ -use actix_web::{web, HttpResponse, Responder}; -use bytes::Bytes; +use actix_web::{ + web::{self, Json}, + HttpResponse, Responder, +}; use crate::{ handlers::http::{modal::utils::rbac_utils::get_metadata, role::RoleError}, @@ -27,14 +29,16 @@ use crate::{ // Handler for PUT /api/v1/role/{name} // Creates a new role or update existing one -pub async fn put(name: web::Path, body: Bytes) -> Result { +pub async fn put( + name: web::Path, + Json(privileges): Json>, +) -> Result { let name = name.into_inner(); - let privileges = serde_json::from_slice::>(&body)?; let mut metadata = get_metadata().await?; metadata.roles.insert(name.clone(), privileges.clone()); let _ = storage::put_staging_metadata(&metadata); - mut_roles().insert(name.clone(), privileges.clone()); + mut_roles().insert(name.clone(), privileges); Ok(HttpResponse::Ok().finish()) } diff --git a/src/handlers/http/modal/query/querier_role.rs b/src/handlers/http/modal/query/querier_role.rs index b9930579c..b8b6f4639 100644 --- a/src/handlers/http/modal/query/querier_role.rs +++ b/src/handlers/http/modal/query/querier_role.rs @@ -16,8 +16,10 @@ * */ -use actix_web::{web, HttpResponse, Responder}; -use bytes::Bytes; +use actix_web::{ + web::{self, Json}, + HttpResponse, Responder, +}; use crate::{ handlers::http::{ @@ -30,9 +32,11 @@ use crate::{ // Handler for PUT /api/v1/role/{name} // Creates a new role or update existing one -pub async fn put(name: web::Path, body: Bytes) -> Result { +pub async fn put( + name: web::Path, + Json(privileges): Json>, +) -> Result { let name = name.into_inner(); - let privileges = serde_json::from_slice::>(&body)?; let mut metadata = get_metadata().await?; metadata.roles.insert(name.clone(), privileges.clone()); diff --git a/src/handlers/http/role.rs b/src/handlers/http/role.rs index 757f4f170..711eade9b 100644 --- a/src/handlers/http/role.rs +++ b/src/handlers/http/role.rs @@ -16,8 +16,11 @@ * */ -use actix_web::{http::header::ContentType, web, HttpResponse, Responder}; -use bytes::Bytes; +use actix_web::{ + http::header::ContentType, + web::{self, Json}, + HttpResponse, Responder, +}; use http::StatusCode; use crate::{ @@ -31,9 +34,11 @@ use crate::{ // Handler for PUT /api/v1/role/{name} // Creates a new role or update existing one -pub async fn put(name: web::Path, body: Bytes) -> Result { +pub async fn put( + name: web::Path, + Json(privileges): Json>, +) -> Result { let name = name.into_inner(); - let privileges = serde_json::from_slice::>(&body)?; let mut metadata = get_metadata().await?; metadata.roles.insert(name.clone(), privileges.clone()); From 75838324c15210af9a953b4c0382bd29c614f2a8 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 20 Jan 2025 23:06:10 +0530 Subject: [PATCH 13/19] refactor: serde `Dashboard` --- src/handlers/http/users/dashboards.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/handlers/http/users/dashboards.rs b/src/handlers/http/users/dashboards.rs index 4e54f1232..f95de4559 100644 --- a/src/handlers/http/users/dashboards.rs +++ b/src/handlers/http/users/dashboards.rs @@ -23,7 +23,11 @@ use crate::{ users::dashboards::{Dashboard, CURRENT_DASHBOARD_VERSION, DASHBOARDS}, utils::{get_hash, get_user_from_request}, }; -use actix_web::{http::header::ContentType, web, web::Path, HttpRequest, HttpResponse, Responder}; +use actix_web::{ + http::header::ContentType, + web::{self, Json, Path}, + HttpRequest, HttpResponse, Responder, +}; use bytes::Bytes; use rand::distributions::DistString; @@ -52,10 +56,12 @@ pub async fn get( Err(DashboardError::Metadata("Dashboard does not exist")) } -pub async fn post(req: HttpRequest, body: Bytes) -> Result { +pub async fn post( + req: HttpRequest, + Json(mut dashboard): Json, +) -> Result { let mut user_id = get_user_from_request(&req)?; user_id = get_hash(&user_id); - let mut dashboard: Dashboard = serde_json::from_slice(&body)?; let dashboard_id = get_hash(Utc::now().timestamp_micros().to_string().as_str()); dashboard.dashboard_id = Some(dashboard_id.clone()); dashboard.version = Some(CURRENT_DASHBOARD_VERSION.to_string()); @@ -87,7 +93,7 @@ pub async fn post(req: HttpRequest, body: Bytes) -> Result, - body: Bytes, + Json(mut dashboard): Json, ) -> Result { let mut user_id = get_user_from_request(&req)?; user_id = get_hash(&user_id); @@ -96,7 +102,6 @@ pub async fn update( if DASHBOARDS.get_dashboard(&dashboard_id, &user_id).is_none() { return Err(DashboardError::Metadata("Dashboard does not exist")); } - let mut dashboard: Dashboard = serde_json::from_slice(&body)?; dashboard.dashboard_id = Some(dashboard_id.to_string()); dashboard.user_id = Some(user_id.clone()); dashboard.version = Some(CURRENT_DASHBOARD_VERSION.to_string()); From 9248ef764dd2254871c6583fb94036d6219fdeb6 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 20 Jan 2025 23:06:53 +0530 Subject: [PATCH 14/19] refactor: serde `Filter` --- src/handlers/http/users/filters.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/handlers/http/users/filters.rs b/src/handlers/http/users/filters.rs index 7d1029a46..c3e2ed905 100644 --- a/src/handlers/http/users/filters.rs +++ b/src/handlers/http/users/filters.rs @@ -25,7 +25,7 @@ use crate::{ }; use actix_web::{ http::header::ContentType, - web::{self, Path}, + web::{self, Json, Path}, HttpRequest, HttpResponse, Responder, }; use bytes::Bytes; @@ -53,10 +53,12 @@ pub async fn get( Err(FiltersError::Metadata("Filter does not exist")) } -pub async fn post(req: HttpRequest, body: Bytes) -> Result { +pub async fn post( + req: HttpRequest, + Json(mut filter): Json, +) -> Result { let mut user_id = get_user_from_request(&req)?; user_id = get_hash(&user_id); - let mut filter: Filter = serde_json::from_slice(&body)?; let filter_id = get_hash(Utc::now().timestamp_micros().to_string().as_str()); filter.filter_id = Some(filter_id.clone()); filter.user_id = Some(user_id.clone()); @@ -79,7 +81,7 @@ pub async fn post(req: HttpRequest, body: Bytes) -> Result, - body: Bytes, + Json(mut filter): Json, ) -> Result { let mut user_id = get_user_from_request(&req)?; user_id = get_hash(&user_id); @@ -87,7 +89,6 @@ pub async fn update( if FILTERS.get_filter(&filter_id, &user_id).is_none() { return Err(FiltersError::Metadata("Filter does not exist")); } - let mut filter: Filter = serde_json::from_slice(&body)?; filter.filter_id = Some(filter_id.clone()); filter.user_id = Some(user_id.clone()); filter.version = Some(CURRENT_FILTER_VERSION.to_string()); From 2a2828545b3904f58d3145c959d49e82e1e204f8 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 21 Jan 2025 16:45:51 +0530 Subject: [PATCH 15/19] refactor: move up `p_timestamp` addition to recordbatch --- src/event/format/mod.rs | 11 +++++++++-- src/event/writer/mod.rs | 15 +-------------- src/utils/arrow/mod.rs | 39 +++++++++++++++++++++++++++++++++++++-- 3 files changed, 47 insertions(+), 18 deletions(-) diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 4dc89a5eb..a574c137d 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -30,7 +30,7 @@ use chrono::DateTime; use serde::{Deserialize, Serialize}; use serde_json::Value; -use crate::{metadata::SchemaVersion, utils::arrow::get_field}; +use crate::{metadata::SchemaVersion, utils::arrow::{get_field, get_timestamp_array, replace_columns}}; use super::DEFAULT_TIMESTAMP_KEY; @@ -141,7 +141,14 @@ pub trait EventFormat: Sized { } new_schema = update_field_type_in_schema(new_schema, None, time_partition, None, schema_version); - let rb = Self::decode(data, new_schema.clone())?; + + let mut rb = Self::decode(data, new_schema.clone())?; + rb = replace_columns( + rb.schema(), + &rb, + &[0], + &[Arc::new(get_timestamp_array(rb.num_rows()))], + ); Ok((rb, is_first)) } diff --git a/src/event/writer/mod.rs b/src/event/writer/mod.rs index 9efbc3fcc..59966113b 100644 --- a/src/event/writer/mod.rs +++ b/src/event/writer/mod.rs @@ -28,14 +28,12 @@ use std::{ use crate::{ option::{Mode, CONFIG}, storage::StreamType, - utils, }; use self::{errors::StreamWriterError, file_writer::FileWriter, mem_writer::MemWriter}; -use arrow_array::{RecordBatch, TimestampMillisecondArray}; +use arrow_array::RecordBatch; use arrow_schema::Schema; use chrono::NaiveDateTime; -use chrono::Utc; use derive_more::{Deref, DerefMut}; use once_cell::sync::Lazy; @@ -56,13 +54,6 @@ impl Writer { parsed_timestamp: NaiveDateTime, custom_partition_values: &HashMap, ) -> Result<(), StreamWriterError> { - let rb = utils::arrow::replace_columns( - rb.schema(), - &rb, - &[0], - &[Arc::new(get_timestamp_array(rb.num_rows()))], - ); - self.disk.push( stream_name, schema_key, @@ -243,10 +234,6 @@ impl WriterTable { } } -fn get_timestamp_array(size: usize) -> TimestampMillisecondArray { - TimestampMillisecondArray::from_value(Utc::now().timestamp_millis(), size) -} - pub mod errors { #[derive(Debug, thiserror::Error)] diff --git a/src/utils/arrow/mod.rs b/src/utils/arrow/mod.rs index b3105eeee..93fb271b0 100644 --- a/src/utils/arrow/mod.rs +++ b/src/utils/arrow/mod.rs @@ -42,8 +42,9 @@ use std::sync::Arc; -use arrow_array::{Array, RecordBatch}; +use arrow_array::{Array, RecordBatch, TimestampMillisecondArray}; use arrow_schema::Schema; +use chrono::Utc; use itertools::Itertools; pub mod batch_adapter; @@ -125,6 +126,19 @@ pub fn get_field<'a>( .find(|field| field.name() == name) } +/// Constructs an array of the current timestamp. +/// +/// # Arguments +/// +/// * `size` - The number of rows for which timestamp values are to be added. +/// +/// # Returns +/// +/// A column in arrow, containing the current timestamp in millis. +pub fn get_timestamp_array(size: usize) -> TimestampMillisecondArray { + TimestampMillisecondArray::from_value(Utc::now().timestamp_millis(), size) +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -132,7 +146,7 @@ mod tests { use arrow_array::{Array, Int32Array, RecordBatch}; use arrow_schema::{DataType, Field, Schema}; - use super::{record_batches_to_json, replace_columns}; + use super::*; #[test] fn check_replace() { @@ -170,4 +184,25 @@ mod tests { let batches = record_batches_to_json(&rb).unwrap(); assert_eq!(batches, vec![]); } + + #[test] + fn test_timestamp_array_has_correct_size_and_value() { + let size = 5; + let now = Utc::now().timestamp_millis(); + + let array = get_timestamp_array(size); + + assert_eq!(array.len(), size); + for i in 0..size { + assert!(array.value(i) >= now); + } + } + + #[test] + fn test_timestamp_array_with_zero_size() { + let array = get_timestamp_array(0); + + assert_eq!(array.len(), 0); + assert!(array.is_empty()); + } } From ff50822e341afcd4afdb19c370eb22741143ef50 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 21 Jan 2025 17:37:52 +0530 Subject: [PATCH 16/19] refactor: refer over clone --- src/event/mod.rs | 40 +++++++++------------------------- src/event/writer/mem_writer.rs | 6 ++--- src/event/writer/mod.rs | 18 +++++++-------- 3 files changed, 22 insertions(+), 42 deletions(-) diff --git a/src/event/mod.rs b/src/event/mod.rs index 2e9bc7359..0c2c1f6b9 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -28,7 +28,7 @@ use tracing::error; use self::error::EventError; pub use self::writer::STREAM_WRITERS; -use crate::{handlers::http::ingest::PostError, metadata, storage::StreamType}; +use crate::{metadata, storage::StreamType}; use chrono::NaiveDateTime; use std::collections::HashMap; @@ -49,7 +49,7 @@ pub struct Event { // Events holds the schema related to a each event for a single log stream impl Event { - pub async fn process(&self) -> Result<(), EventError> { + pub async fn process(self) -> Result<(), EventError> { let mut key = get_schema_key(&self.rb.schema().fields); if self.time_partition.is_some() { let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string(); @@ -69,10 +69,10 @@ impl Event { commit_schema(&self.stream_name, self.rb.schema())?; } - Self::process_event( + STREAM_WRITERS.append_to_local( &self.stream_name, &key, - self.rb.clone(), + &self.rb, self.parsed_timestamp, &self.custom_partition_values, &self.stream_type, @@ -98,44 +98,24 @@ impl Event { Ok(()) } - pub fn process_unchecked(&self) -> Result<(), PostError> { + pub fn process_unchecked(&self) -> Result<(), EventError> { let key = get_schema_key(&self.rb.schema().fields); - Self::process_event( + STREAM_WRITERS.append_to_local( &self.stream_name, &key, - self.rb.clone(), + &self.rb, self.parsed_timestamp, &self.custom_partition_values, &self.stream_type, - ) - .map_err(PostError::Event) + )?; + + Ok(()) } pub fn clear(&self, stream_name: &str) { STREAM_WRITERS.clear(stream_name); } - - // event process all events after the 1st event. Concatenates record batches - // and puts them in memory store for each event. - fn process_event( - stream_name: &str, - schema_key: &str, - rb: RecordBatch, - parsed_timestamp: NaiveDateTime, - custom_partition_values: &HashMap, - stream_type: &StreamType, - ) -> Result<(), EventError> { - STREAM_WRITERS.append_to_local( - stream_name, - schema_key, - rb, - parsed_timestamp, - custom_partition_values.clone(), - stream_type, - )?; - Ok(()) - } } pub fn get_schema_key(fields: &[Arc]) -> String { diff --git a/src/event/writer/mem_writer.rs b/src/event/writer/mem_writer.rs index 561f2c4e5..d24077333 100644 --- a/src/event/writer/mem_writer.rs +++ b/src/event/writer/mem_writer.rs @@ -50,7 +50,7 @@ impl Default for MemWriter { } impl MemWriter { - pub fn push(&mut self, schema_key: &str, rb: RecordBatch) { + pub fn push(&mut self, schema_key: &str, rb: &RecordBatch) { if !self.schema_map.contains(schema_key) { self.schema_map.insert(schema_key.to_owned()); self.schema = Schema::try_merge([self.schema.clone(), (*rb.schema()).clone()]).unwrap(); @@ -97,7 +97,7 @@ pub struct MutableBuffer { } impl MutableBuffer { - fn push(&mut self, rb: RecordBatch) -> Option> { + fn push(&mut self, rb: &RecordBatch) -> Option> { if self.rows + rb.num_rows() >= N { let left = N - self.rows; let right = rb.num_rows() - left; @@ -121,7 +121,7 @@ impl MutableBuffer { Some(inner) } else { self.rows += rb.num_rows(); - self.inner.push(rb); + self.inner.push(rb.clone()); None } } diff --git a/src/event/writer/mod.rs b/src/event/writer/mod.rs index 59966113b..50f063a12 100644 --- a/src/event/writer/mod.rs +++ b/src/event/writer/mod.rs @@ -50,14 +50,14 @@ impl Writer { &mut self, stream_name: &str, schema_key: &str, - rb: RecordBatch, + rb: &RecordBatch, parsed_timestamp: NaiveDateTime, custom_partition_values: &HashMap, ) -> Result<(), StreamWriterError> { self.disk.push( stream_name, schema_key, - &rb, + rb, parsed_timestamp, custom_partition_values, )?; @@ -65,7 +65,7 @@ impl Writer { Ok(()) } - fn push_mem(&mut self, schema_key: &str, rb: RecordBatch) -> Result<(), StreamWriterError> { + fn push_mem(&mut self, schema_key: &str, rb: &RecordBatch) -> Result<(), StreamWriterError> { self.mem.push(schema_key, rb); Ok(()) } @@ -80,9 +80,9 @@ impl WriterTable { &self, stream_name: &str, schema_key: &str, - record: RecordBatch, + record: &RecordBatch, parsed_timestamp: NaiveDateTime, - custom_partition_values: HashMap, + custom_partition_values: &HashMap, stream_type: &StreamType, ) -> Result<(), StreamWriterError> { let hashmap_guard = self.read().unwrap(); @@ -95,7 +95,7 @@ impl WriterTable { schema_key, record, parsed_timestamp, - &custom_partition_values, + custom_partition_values, stream_type, )?; } @@ -110,7 +110,7 @@ impl WriterTable { schema_key, record, parsed_timestamp, - &custom_partition_values, + custom_partition_values, stream_type, )?; } @@ -124,7 +124,7 @@ impl WriterTable { stream_writer: &Mutex, stream_name: &str, schema_key: &str, - record: RecordBatch, + record: &RecordBatch, parsed_timestamp: NaiveDateTime, custom_partition_values: &HashMap, stream_type: &StreamType, @@ -153,7 +153,7 @@ impl WriterTable { mut map: RwLockWriteGuard>>, stream_name: &str, schema_key: &str, - record: RecordBatch, + record: &RecordBatch, parsed_timestamp: NaiveDateTime, custom_partition_values: &HashMap, stream_type: &StreamType, From 6a1175a0bebe62ccc56eb6781e0421e249358be1 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 21 Jan 2025 17:38:35 +0530 Subject: [PATCH 17/19] fix: don't hog write privileges --- src/event/writer/mod.rs | 120 +++++++++++++++++----------------------- 1 file changed, 51 insertions(+), 69 deletions(-) diff --git a/src/event/writer/mod.rs b/src/event/writer/mod.rs index 50f063a12..5c21e5d31 100644 --- a/src/event/writer/mod.rs +++ b/src/event/writer/mod.rs @@ -22,7 +22,7 @@ mod mem_writer; use std::{ collections::HashMap, - sync::{Arc, Mutex, RwLock, RwLockWriteGuard}, + sync::{Arc, Mutex, RwLock}, }; use crate::{ @@ -75,7 +75,7 @@ impl Writer { pub struct WriterTable(RwLock>>); impl WriterTable { - // append to a existing stream + // Concatenates record batches and puts them in memory store for each event. pub fn append_to_local( &self, stream_name: &str, @@ -85,43 +85,33 @@ impl WriterTable { custom_partition_values: &HashMap, stream_type: &StreamType, ) -> Result<(), StreamWriterError> { - let hashmap_guard = self.read().unwrap(); - - match hashmap_guard.get(stream_name) { - Some(stream_writer) => { - self.handle_existing_writer( - stream_writer, - stream_name, - schema_key, - record, - parsed_timestamp, + let has_stream = self.read().unwrap().contains_key(stream_name); + if has_stream { + self.handle_existing_writer( + stream_name, + schema_key, + record, + parsed_timestamp, custom_partition_values, - stream_type, - )?; - } - None => { - drop(hashmap_guard); - let map = self.write().unwrap(); - // check for race condition - // if map contains entry then just - self.handle_missing_writer( - map, - stream_name, - schema_key, - record, - parsed_timestamp, + stream_type, + )?; + } else { + self.handle_missing_writer( + stream_name, + schema_key, + record, + parsed_timestamp, custom_partition_values, - stream_type, - )?; - } + stream_type, + )?; }; + Ok(()) } - #[allow(clippy::too_many_arguments)] + /// Update writer for stream when it already exists fn handle_existing_writer( &self, - stream_writer: &Mutex, stream_name: &str, schema_key: &str, record: &RecordBatch, @@ -129,8 +119,14 @@ impl WriterTable { custom_partition_values: &HashMap, stream_type: &StreamType, ) -> Result<(), StreamWriterError> { + let hashmap_guard = self.read().unwrap(); + let mut stream_writer = hashmap_guard + .get(stream_name) + .expect("Stream exists") + .lock() + .unwrap(); if CONFIG.options.mode != Mode::Query || *stream_type == StreamType::Internal { - stream_writer.lock().unwrap().push( + stream_writer.push( stream_name, schema_key, record, @@ -138,19 +134,17 @@ impl WriterTable { custom_partition_values, )?; } else { - stream_writer - .lock() - .unwrap() - .push_mem(stream_name, record)?; + stream_writer.push_mem(stream_name, record)?; } Ok(()) } - #[allow(clippy::too_many_arguments)] + /// Construct a writer for new stream of data + /// + /// TODO: verify with model checker fn handle_missing_writer( &self, - mut map: RwLockWriteGuard>>, stream_name: &str, schema_key: &str, record: &RecordBatch, @@ -158,38 +152,26 @@ impl WriterTable { custom_partition_values: &HashMap, stream_type: &StreamType, ) -> Result<(), StreamWriterError> { - match map.get(stream_name) { - Some(writer) => { - if CONFIG.options.mode != Mode::Query || *stream_type == StreamType::Internal { - writer.lock().unwrap().push( - stream_name, - schema_key, - record, - parsed_timestamp, - custom_partition_values, - )?; - } else { - writer.lock().unwrap().push_mem(stream_name, record)?; - } - } - None => { - if CONFIG.options.mode != Mode::Query || *stream_type == StreamType::Internal { - let mut writer = Writer::default(); - writer.push( - stream_name, - schema_key, - record, - parsed_timestamp, - custom_partition_values, - )?; - map.insert(stream_name.to_owned(), Mutex::new(writer)); - } else { - let mut writer = Writer::default(); - writer.push_mem(schema_key, record)?; - map.insert(stream_name.to_owned(), Mutex::new(writer)); - } - } + // Gets write privileges only for inserting a writer + self.write() + .unwrap() + .insert(stream_name.to_owned(), Mutex::new(Writer::default())); + // Updates the writer with read privileges + let hashmap_guard = self.read().unwrap(); + let writer = hashmap_guard.get(stream_name).expect("Stream exists"); + + if CONFIG.options.mode != Mode::Query || *stream_type == StreamType::Internal { + writer.lock().unwrap().push( + stream_name, + schema_key, + record, + parsed_timestamp, + custom_partition_values, + )?; + } else { + writer.lock().unwrap().push_mem(stream_name, record)?; } + Ok(()) } From e9c53de6507cffed95bb9016243730ff99c74c28 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 21 Jan 2025 17:46:39 +0530 Subject: [PATCH 18/19] refactor: DRY stream writer creation --- src/event/writer/mod.rs | 77 ++++++++++------------------------------- 1 file changed, 19 insertions(+), 58 deletions(-) diff --git a/src/event/writer/mod.rs b/src/event/writer/mod.rs index 5c21e5d31..895cd59ed 100644 --- a/src/event/writer/mod.rs +++ b/src/event/writer/mod.rs @@ -85,26 +85,22 @@ impl WriterTable { custom_partition_values: &HashMap, stream_type: &StreamType, ) -> Result<(), StreamWriterError> { - let has_stream = self.read().unwrap().contains_key(stream_name); - if has_stream { - self.handle_existing_writer( - stream_name, - schema_key, - record, - parsed_timestamp, - custom_partition_values, - stream_type, - )?; - } else { - self.handle_missing_writer( - stream_name, - schema_key, - record, - parsed_timestamp, - custom_partition_values, - stream_type, - )?; - }; + if !self.read().unwrap().contains_key(stream_name) { + // Gets write privileges only for inserting a writer + self.write() + .unwrap() + .insert(stream_name.to_owned(), Mutex::new(Writer::default())); + } + + // Updates the writer with only read privileges + self.handle_existing_writer( + stream_name, + schema_key, + record, + parsed_timestamp, + custom_partition_values, + stream_type, + )?; Ok(()) } @@ -120,48 +116,13 @@ impl WriterTable { stream_type: &StreamType, ) -> Result<(), StreamWriterError> { let hashmap_guard = self.read().unwrap(); - let mut stream_writer = hashmap_guard + let mut writer = hashmap_guard .get(stream_name) .expect("Stream exists") .lock() .unwrap(); if CONFIG.options.mode != Mode::Query || *stream_type == StreamType::Internal { - stream_writer.push( - stream_name, - schema_key, - record, - parsed_timestamp, - custom_partition_values, - )?; - } else { - stream_writer.push_mem(stream_name, record)?; - } - - Ok(()) - } - - /// Construct a writer for new stream of data - /// - /// TODO: verify with model checker - fn handle_missing_writer( - &self, - stream_name: &str, - schema_key: &str, - record: &RecordBatch, - parsed_timestamp: NaiveDateTime, - custom_partition_values: &HashMap, - stream_type: &StreamType, - ) -> Result<(), StreamWriterError> { - // Gets write privileges only for inserting a writer - self.write() - .unwrap() - .insert(stream_name.to_owned(), Mutex::new(Writer::default())); - // Updates the writer with read privileges - let hashmap_guard = self.read().unwrap(); - let writer = hashmap_guard.get(stream_name).expect("Stream exists"); - - if CONFIG.options.mode != Mode::Query || *stream_type == StreamType::Internal { - writer.lock().unwrap().push( + writer.push( stream_name, schema_key, record, @@ -169,7 +130,7 @@ impl WriterTable { custom_partition_values, )?; } else { - writer.lock().unwrap().push_mem(stream_name, record)?; + writer.push_mem(stream_name, record)?; } Ok(()) From d61d72d1047cb5327edfb50ef59754263a9fd205 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 22 Jan 2025 16:16:57 +0530 Subject: [PATCH 19/19] refactor: serde `StreamType` --- src/event/format/mod.rs | 7 +++-- src/handlers/http/ingest.rs | 27 +++++-------------- src/handlers/http/logstream.rs | 13 ++++----- .../http/modal/utils/logstream_utils.rs | 17 +++++++----- src/kafka.rs | 7 +---- src/metadata.rs | 2 +- src/storage/mod.rs | 12 ++++++++- src/storage/object_storage.rs | 6 ++--- src/utils/arrow/mod.rs | 8 +++--- src/validator.rs | 7 +++-- 10 files changed, 52 insertions(+), 54 deletions(-) diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index a574c137d..9d83d2d48 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -30,7 +30,10 @@ use chrono::DateTime; use serde::{Deserialize, Serialize}; use serde_json::Value; -use crate::{metadata::SchemaVersion, utils::arrow::{get_field, get_timestamp_array, replace_columns}}; +use crate::{ + metadata::SchemaVersion, + utils::arrow::{get_field, get_timestamp_array, replace_columns}, +}; use super::DEFAULT_TIMESTAMP_KEY; @@ -141,7 +144,7 @@ pub trait EventFormat: Sized { } new_schema = update_field_type_in_schema(new_schema, None, time_partition, None, schema_version); - + let mut rb = Self::decode(data, new_schema.clone())?; rb = replace_columns( rb.schema(), diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 8d70a348d..b3da07761 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -64,12 +64,8 @@ pub async fn ingest(req: HttpRequest, Json(json): Json) -> Result Result { let mut stream_exists = false; diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 5d95bd2de..e106a2de9 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -496,11 +496,11 @@ pub async fn create_stream( custom_partition: &str, static_schema_flag: bool, schema: Arc, - stream_type: &str, + stream_type: StreamType, log_source: LogSource, ) -> Result<(), CreateStreamError> { // fail to proceed if invalid stream name - if stream_type != StreamType::Internal.to_string() { + if stream_type != StreamType::Internal { validator::stream_name(&stream_name, stream_type)?; } // Proceed to create log stream if it doesn't exist @@ -735,12 +735,9 @@ pub async fn delete_stream_hot_tier( } pub async fn create_internal_stream_if_not_exists() -> Result<(), StreamError> { - if let Ok(stream_exists) = create_stream_if_not_exists( - INTERNAL_STREAM_NAME, - &StreamType::Internal.to_string(), - LogSource::Pmeta, - ) - .await + if let Ok(stream_exists) = + create_stream_if_not_exists(INTERNAL_STREAM_NAME, StreamType::Internal, LogSource::Pmeta) + .await { if stream_exists { return Ok(()); diff --git a/src/handlers/http/modal/utils/logstream_utils.rs b/src/handlers/http/modal/utils/logstream_utils.rs index e73de1ee5..cdc338ad8 100644 --- a/src/handlers/http/modal/utils/logstream_utils.rs +++ b/src/handlers/http/modal/utils/logstream_utils.rs @@ -114,7 +114,7 @@ pub async fn create_update_stream( &custom_partition, static_schema_flag, schema, - &stream_type, + stream_type, log_source, ) .await?; @@ -175,7 +175,7 @@ pub struct PutStreamHeaders { pub custom_partition: String, pub static_schema_flag: bool, pub update_stream_flag: bool, - pub stream_type: String, + pub stream_type: StreamType, pub log_source: LogSource, } @@ -202,8 +202,8 @@ impl From<&HeaderMap> for PutStreamHeaders { .is_some_and(|v| v.to_str().unwrap() == "true"), stream_type: headers .get(STREAM_TYPE_KEY) - .map_or("", |v| v.to_str().unwrap()) - .to_string(), + .map(|v| StreamType::from(v.to_str().unwrap())) + .unwrap_or_default(), log_source: headers .get(LOG_SOURCE_KEY) .map_or(LogSource::default(), |v| v.to_str().unwrap().into()), @@ -392,11 +392,11 @@ pub async fn create_stream( custom_partition: &str, static_schema_flag: bool, schema: Arc, - stream_type: &str, + stream_type: StreamType, log_source: LogSource, ) -> Result<(), CreateStreamError> { // fail to proceed if invalid stream name - if stream_type != StreamType::Internal.to_string() { + if stream_type != StreamType::Internal { validator::stream_name(&stream_name, stream_type)?; } // Proceed to create log stream if it doesn't exist @@ -484,7 +484,10 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result< .and_then(|limit| limit.parse().ok()); let custom_partition = stream_metadata.custom_partition.as_deref().unwrap_or(""); let static_schema_flag = stream_metadata.static_schema_flag; - let stream_type = stream_metadata.stream_type.as_deref().unwrap_or(""); + let stream_type = stream_metadata + .stream_type + .map(|s| StreamType::from(s.as_str())) + .unwrap_or_default(); let schema_version = stream_metadata.schema_version; let log_source = stream_metadata.log_source; metadata::STREAM_INFO.add_stream( diff --git a/src/kafka.rs b/src/kafka.rs index 8293b4e1f..9ba697a97 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -181,12 +181,7 @@ async fn ingest_message(msg: BorrowedMessage<'_>) -> Result<(), KafkaError> { let stream_name = msg.topic(); // stream should get created only if there is an incoming event, not before that - create_stream_if_not_exists( - stream_name, - &StreamType::UserDefined.to_string(), - LogSource::default(), - ) - .await?; + create_stream_if_not_exists(stream_name, StreamType::UserDefined, LogSource::default()).await?; let schema = resolve_schema(stream_name)?; let event = format::json::Event { diff --git a/src/metadata.rs b/src/metadata.rs index c3ff0fce7..5c18aa329 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -271,7 +271,7 @@ impl StreamInfo { custom_partition: String, static_schema_flag: bool, static_schema: HashMap>, - stream_type: &str, + stream_type: StreamType, schema_version: SchemaVersion, log_source: LogSource, ) { diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 454a17ebe..85c46dade 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -144,13 +144,23 @@ pub struct StreamInfo { pub log_source: LogSource, } -#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, Default)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize, Default)] pub enum StreamType { #[default] UserDefined, Internal, } +impl From<&str> for StreamType { + fn from(stream_type: &str) -> Self { + match stream_type { + "UserDefined" => Self::UserDefined, + "Internal" => Self::Internal, + t => panic!("Unexpected stream type: {t}"), + } + } +} + impl std::fmt::Display for StreamType { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 05c046179..16d526c8a 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -21,8 +21,8 @@ use super::{ ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata, }; use super::{ - Owner, ALERT_FILE_NAME, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, - SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, + Owner, StreamType, ALERT_FILE_NAME, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, + PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; use crate::event::format::LogSource; @@ -156,7 +156,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { custom_partition: &str, static_schema_flag: bool, schema: Arc, - stream_type: &str, + stream_type: StreamType, log_source: LogSource, ) -> Result { let format = ObjectStoreFormat { diff --git a/src/utils/arrow/mod.rs b/src/utils/arrow/mod.rs index 93fb271b0..2cbdbf0a5 100644 --- a/src/utils/arrow/mod.rs +++ b/src/utils/arrow/mod.rs @@ -127,13 +127,13 @@ pub fn get_field<'a>( } /// Constructs an array of the current timestamp. -/// +/// /// # Arguments -/// +/// /// * `size` - The number of rows for which timestamp values are to be added. -/// +/// /// # Returns -/// +/// /// A column in arrow, containing the current timestamp in millis. pub fn get_timestamp_array(size: usize) -> TimestampMillisecondArray { TimestampMillisecondArray::from_value(Utc::now().timestamp_millis(), size) diff --git a/src/validator.rs b/src/validator.rs index bfa1dae02..bcbaefea6 100644 --- a/src/validator.rs +++ b/src/validator.rs @@ -77,7 +77,10 @@ pub fn alert(alerts: &Alerts) -> Result<(), AlertValidationError> { Ok(()) } -pub fn stream_name(stream_name: &str, stream_type: &str) -> Result<(), StreamNameValidationError> { +pub fn stream_name( + stream_name: &str, + stream_type: StreamType, +) -> Result<(), StreamNameValidationError> { if stream_name.is_empty() { return Err(StreamNameValidationError::EmptyName); } @@ -102,7 +105,7 @@ pub fn stream_name(stream_name: &str, stream_type: &str) -> Result<(), StreamNam )); } - if stream_type == StreamType::Internal.to_string() { + if stream_type == StreamType::Internal { return Err(StreamNameValidationError::InternalStream( stream_name.to_owned(), ));