diff --git a/src/event/format/json.rs b/src/event/format/json.rs index ab9116eb7..40559a271 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -29,7 +29,7 @@ use serde_json::Value; use std::{collections::HashMap, sync::Arc}; use tracing::error; -use super::{EventFormat, LogSource, Metadata, Tags}; +use super::{EventFormat, LogSource}; use crate::{ metadata::SchemaVersion, utils::{arrow::get_field, json::flatten_json_body}, @@ -37,8 +37,6 @@ use crate::{ pub struct Event { pub data: Value, - pub tags: Tags, - pub metadata: Metadata, } impl EventFormat for Event { @@ -53,7 +51,7 @@ impl EventFormat for Event { time_partition: Option<&String>, schema_version: SchemaVersion, log_source: &LogSource, - ) -> Result<(Self::Data, Vec>, bool, Tags, Metadata), anyhow::Error> { + ) -> Result<(Self::Data, Vec>, bool), anyhow::Error> { let data = flatten_json_body( self.data, None, @@ -119,7 +117,7 @@ impl EventFormat for Event { )); } - Ok((value_arr, schema, is_first, self.tags, self.metadata)) + Ok((value_arr, schema, is_first)) } // Convert the Data type (defined above) to arrow record batch diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 593e82f1e..ffff27161 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -23,24 +23,18 @@ use std::{ }; use anyhow::{anyhow, Error as AnyError}; -use arrow_array::{RecordBatch, StringArray}; +use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Schema, TimeUnit}; use chrono::DateTime; use serde_json::Value; -use crate::{ - metadata::SchemaVersion, - utils::{self, arrow::get_field}, -}; +use crate::{metadata::SchemaVersion, utils::arrow::get_field}; -use super::{DEFAULT_METADATA_KEY, DEFAULT_TAGS_KEY, DEFAULT_TIMESTAMP_KEY}; +use super::DEFAULT_TIMESTAMP_KEY; pub mod json; static TIME_FIELD_NAME_PARTS: [&str; 2] = ["time", "date"]; - -type Tags = String; -type Metadata = String; type EventSchema = Vec>; /// Source of the logs, used to perform special processing for certain sources @@ -87,7 +81,7 @@ pub trait EventFormat: Sized { time_partition: Option<&String>, schema_version: SchemaVersion, log_source: &LogSource, - ) -> Result<(Self::Data, EventSchema, bool, Tags, Metadata), AnyError>; + ) -> Result<(Self::Data, EventSchema, bool), AnyError>; fn decode(data: Self::Data, schema: Arc) -> Result; @@ -99,7 +93,7 @@ pub trait EventFormat: Sized { schema_version: SchemaVersion, log_source: &LogSource, ) -> Result<(RecordBatch, bool), AnyError> { - let (data, mut schema, is_first, tags, metadata) = self.to_data( + let (data, mut schema, is_first) = self.to_data( storage_schema, static_schema_flag, time_partition, @@ -107,18 +101,6 @@ pub trait EventFormat: Sized { log_source, )?; - // DEFAULT_TAGS_KEY, DEFAULT_METADATA_KEY and DEFAULT_TIMESTAMP_KEY are reserved field names - if get_field(&schema, DEFAULT_TAGS_KEY).is_some() { - return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY)); - }; - - if get_field(&schema, DEFAULT_METADATA_KEY).is_some() { - return Err(anyhow!( - "field {} is a reserved field", - DEFAULT_METADATA_KEY - )); - }; - if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() { return Err(anyhow!( "field {} is a reserved field", @@ -136,16 +118,6 @@ pub trait EventFormat: Sized { )), ); - // p_tags and p_metadata are added to the end of the schema - let tags_index = schema.len(); - let metadata_index = tags_index + 1; - schema.push(Arc::new(Field::new(DEFAULT_TAGS_KEY, DataType::Utf8, true))); - schema.push(Arc::new(Field::new( - DEFAULT_METADATA_KEY, - DataType::Utf8, - true, - ))); - // prepare the record batch and new fields to be added let mut new_schema = Arc::new(Schema::new(schema)); if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) { @@ -154,16 +126,6 @@ pub trait EventFormat: Sized { new_schema = update_field_type_in_schema(new_schema, None, time_partition, None, schema_version); let rb = Self::decode(data, new_schema.clone())?; - let tags_arr = StringArray::from_iter_values(std::iter::repeat(&tags).take(rb.num_rows())); - let metadata_arr = - StringArray::from_iter_values(std::iter::repeat(&metadata).take(rb.num_rows())); - // modify the record batch to add fields to respective indexes - let rb = utils::arrow::replace_columns( - Arc::clone(&new_schema), - &rb, - &[tags_index, metadata_index], - &[Arc::new(tags_arr), Arc::new(metadata_arr)], - ); Ok((rb, is_first)) } diff --git a/src/event/mod.rs b/src/event/mod.rs index 42773ed12..2e9bc7359 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -33,8 +33,6 @@ use chrono::NaiveDateTime; use std::collections::HashMap; pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp"; -pub const DEFAULT_TAGS_KEY: &str = "p_tags"; -pub const DEFAULT_METADATA_KEY: &str = "p_metadata"; #[derive(Clone)] pub struct Event { diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 3b5c327c7..8c6859d2b 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -87,11 +87,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< .ok_or(PostError::StreamNotFound(stream_name.clone()))? .schema .clone(); - let event = format::json::Event { - data: body_val, - tags: String::default(), - metadata: String::default(), - }; + let event = format::json::Event { data: body_val }; // For internal streams, use old schema event.into_recordbatch( &schema, @@ -146,7 +142,7 @@ pub async fn handle_otel_logs_ingestion( let mut json = flatten_otel_logs(&logs); for record in json.iter_mut() { let body: Bytes = serde_json::to_vec(record).unwrap().into(); - push_logs(&stream_name, &req, &body, &log_source).await?; + push_logs(&stream_name, &body, &log_source).await?; } Ok(HttpResponse::Ok().finish()) @@ -179,7 +175,7 @@ pub async fn handle_otel_metrics_ingestion( let mut json = flatten_otel_metrics(metrics); for record in json.iter_mut() { let body: Bytes = serde_json::to_vec(record).unwrap().into(); - push_logs(&stream_name, &req, &body, &log_source).await?; + push_logs(&stream_name, &body, &log_source).await?; } Ok(HttpResponse::Ok().finish()) @@ -213,7 +209,7 @@ pub async fn handle_otel_traces_ingestion( let mut json = flatten_otel_traces(&traces); for record in json.iter_mut() { let body: Bytes = serde_json::to_vec(record).unwrap().into(); - push_logs(&stream_name, &req, &body, &log_source).await?; + push_logs(&stream_name, &body, &log_source).await?; } Ok(HttpResponse::Ok().finish()) @@ -366,17 +362,14 @@ impl actix_web::ResponseError for PostError { #[cfg(test)] mod tests { - use std::{collections::HashMap, sync::Arc}; - - use actix_web::test::TestRequest; use arrow::datatypes::Int64Type; use arrow_array::{ArrayRef, Float64Array, Int64Array, ListArray, StringArray}; use arrow_schema::{DataType, Field}; use serde_json::json; + use std::{collections::HashMap, sync::Arc}; use crate::{ - event::{self, format::LogSource}, - handlers::{http::modal::utils::ingest_utils::into_event_batch, PREFIX_META, PREFIX_TAGS}, + event::format::LogSource, handlers::http::modal::utils::ingest_utils::into_event_batch, metadata::SchemaVersion, }; @@ -412,13 +405,7 @@ mod tests { "b": "hello", }); - let req = TestRequest::default() - .append_header((PREFIX_TAGS.to_string() + "A", "tag1")) - .append_header((PREFIX_META.to_string() + "C", "meta1")) - .to_http_request(); - let (rb, _) = into_event_batch( - &req, &json, HashMap::default(), None, @@ -429,7 +416,7 @@ mod tests { .unwrap(); assert_eq!(rb.num_rows(), 1); - assert_eq!(rb.num_columns(), 6); + assert_eq!(rb.num_columns(), 4); assert_eq!( rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), &Int64Array::from_iter([1]) @@ -442,20 +429,6 @@ mod tests { rb.column_by_name("c").unwrap().as_float64_arr().unwrap(), &Float64Array::from_iter([4.23]) ); - assert_eq!( - rb.column_by_name(event::DEFAULT_TAGS_KEY) - .unwrap() - .as_utf8_arr() - .unwrap(), - &StringArray::from_iter_values(["a=tag1"]) - ); - assert_eq!( - rb.column_by_name(event::DEFAULT_METADATA_KEY) - .unwrap() - .as_utf8_arr() - .unwrap(), - &StringArray::from_iter_values(["c=meta1"]) - ); } #[test] @@ -466,10 +439,7 @@ mod tests { "c": null }); - let req = TestRequest::default().to_http_request(); - let (rb, _) = into_event_batch( - &req, &json, HashMap::default(), None, @@ -480,7 +450,7 @@ mod tests { .unwrap(); assert_eq!(rb.num_rows(), 1); - assert_eq!(rb.num_columns(), 5); + assert_eq!(rb.num_columns(), 3); assert_eq!( rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), &Int64Array::from_iter([1]) @@ -507,10 +477,7 @@ mod tests { .into_iter(), ); - let req = TestRequest::default().to_http_request(); - let (rb, _) = into_event_batch( - &req, &json, schema, None, @@ -521,7 +488,7 @@ mod tests { .unwrap(); assert_eq!(rb.num_rows(), 1); - assert_eq!(rb.num_columns(), 5); + assert_eq!(rb.num_columns(), 3); assert_eq!( rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), &Int64Array::from_iter([1]) @@ -548,10 +515,7 @@ mod tests { .into_iter(), ); - let req = TestRequest::default().to_http_request(); - assert!(into_event_batch( - &req, &json, schema, None, @@ -575,10 +539,7 @@ mod tests { .into_iter(), ); - let req = TestRequest::default().to_http_request(); - let (rb, _) = into_event_batch( - &req, &json, schema, None, @@ -589,17 +550,14 @@ mod tests { .unwrap(); assert_eq!(rb.num_rows(), 1); - assert_eq!(rb.num_columns(), 3); + assert_eq!(rb.num_columns(), 1); } #[test] fn non_object_arr_is_err() { let json = json!([1]); - let req = TestRequest::default().to_http_request(); - assert!(into_event_batch( - &req, &json, HashMap::default(), None, @@ -628,10 +586,7 @@ mod tests { }, ]); - let req = TestRequest::default().to_http_request(); - let (rb, _) = into_event_batch( - &req, &json, HashMap::default(), None, @@ -642,7 +597,7 @@ mod tests { .unwrap(); assert_eq!(rb.num_rows(), 3); - assert_eq!(rb.num_columns(), 6); + assert_eq!(rb.num_columns(), 4); let schema = rb.schema(); let fields = &schema.fields; @@ -685,10 +640,7 @@ mod tests { }, ]); - let req = TestRequest::default().to_http_request(); - let (rb, _) = into_event_batch( - &req, &json, HashMap::default(), None, @@ -699,7 +651,7 @@ mod tests { .unwrap(); assert_eq!(rb.num_rows(), 3); - assert_eq!(rb.num_columns(), 6); + assert_eq!(rb.num_columns(), 4); assert_eq!( rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), &Int64Array::from(vec![None, Some(1), Some(1)]) @@ -742,10 +694,8 @@ mod tests { ] .into_iter(), ); - let req = TestRequest::default().to_http_request(); let (rb, _) = into_event_batch( - &req, &json, schema, None, @@ -756,7 +706,7 @@ mod tests { .unwrap(); assert_eq!(rb.num_rows(), 3); - assert_eq!(rb.num_columns(), 6); + assert_eq!(rb.num_columns(), 4); assert_eq!( rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), &Int64Array::from(vec![None, Some(1), Some(1)]) @@ -791,8 +741,6 @@ mod tests { }, ]); - let req = TestRequest::default().to_http_request(); - let schema = fields_to_map( [ Field::new("a", DataType::Int64, true), @@ -803,7 +751,6 @@ mod tests { ); assert!(into_event_batch( - &req, &json, schema, None, @@ -837,10 +784,7 @@ mod tests { }, ]); - let req = TestRequest::default().to_http_request(); - let (rb, _) = into_event_batch( - &req, &json, HashMap::default(), None, @@ -851,7 +795,7 @@ mod tests { .unwrap(); assert_eq!(rb.num_rows(), 4); - assert_eq!(rb.num_columns(), 7); + assert_eq!(rb.num_columns(), 5); assert_eq!( rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), &Int64Array::from(vec![Some(1), Some(1), Some(1), Some(1)]) @@ -918,10 +862,7 @@ mod tests { }, ]); - let req = TestRequest::default().to_http_request(); - let (rb, _) = into_event_batch( - &req, &json, HashMap::default(), None, @@ -932,7 +873,7 @@ mod tests { .unwrap(); assert_eq!(rb.num_rows(), 4); - assert_eq!(rb.num_columns(), 7); + assert_eq!(rb.num_columns(), 5); assert_eq!( rb.column_by_name("a").unwrap().as_float64_arr().unwrap(), &Float64Array::from(vec![Some(1.0), Some(1.0), Some(1.0), Some(1.0)]) diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 31cff6d9f..12f2c07a5 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -31,11 +31,11 @@ use crate::{ }, handlers::{ http::{ingest::PostError, kinesis}, - LOG_SOURCE_KEY, PREFIX_META, PREFIX_TAGS, SEPARATOR, + LOG_SOURCE_KEY, }, metadata::{SchemaVersion, STREAM_INFO}, storage::StreamType, - utils::{header_parsing::collect_labelled_headers, json::convert_array_to_object}, + utils::json::convert_array_to_object, }; pub async fn flatten_and_push_logs( @@ -55,7 +55,7 @@ pub async fn flatten_and_push_logs( let json = kinesis::flatten_kinesis_logs(&body); for record in json.iter() { let body: Bytes = serde_json::to_vec(record).unwrap().into(); - push_logs(stream_name, &req, &body, &LogSource::default()).await?; + push_logs(stream_name, &body, &LogSource::default()).await?; } } LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => { @@ -64,7 +64,7 @@ pub async fn flatten_and_push_logs( ))); } _ => { - push_logs(stream_name, &req, &body, &log_source).await?; + push_logs(stream_name, &body, &log_source).await?; } } Ok(()) @@ -72,7 +72,6 @@ pub async fn flatten_and_push_logs( pub async fn push_logs( stream_name: &str, - req: &HttpRequest, body: &Bytes, log_source: &LogSource, ) -> Result<(), PostError> { @@ -90,7 +89,6 @@ pub async fn push_logs( let size = size as u64; create_process_record_batch( stream_name, - req, body_val, static_schema_flag.as_ref(), None, @@ -120,7 +118,6 @@ pub async fn push_logs( let size = value.to_string().into_bytes().len() as u64; create_process_record_batch( stream_name, - req, value, static_schema_flag.as_ref(), None, @@ -147,7 +144,6 @@ pub async fn push_logs( let size = value.to_string().into_bytes().len() as u64; create_process_record_batch( stream_name, - req, value, static_schema_flag.as_ref(), time_partition.as_ref(), @@ -179,7 +175,6 @@ pub async fn push_logs( let size = value.to_string().into_bytes().len() as u64; create_process_record_batch( stream_name, - req, value, static_schema_flag.as_ref(), time_partition.as_ref(), @@ -199,7 +194,6 @@ pub async fn push_logs( #[allow(clippy::too_many_arguments)] pub async fn create_process_record_batch( stream_name: &str, - req: &HttpRequest, value: Value, static_schema_flag: Option<&String>, time_partition: Option<&String>, @@ -211,7 +205,6 @@ pub async fn create_process_record_batch( ) -> Result<(), PostError> { let (rb, is_first_event) = get_stream_schema( stream_name, - req, &value, static_schema_flag, time_partition, @@ -237,7 +230,6 @@ pub async fn create_process_record_batch( pub fn get_stream_schema( stream_name: &str, - req: &HttpRequest, body: &Value, static_schema_flag: Option<&String>, time_partition: Option<&String>, @@ -251,7 +243,6 @@ pub fn get_stream_schema( .schema .clone(); into_event_batch( - req, body, schema, static_schema_flag, @@ -262,7 +253,6 @@ pub fn get_stream_schema( } pub fn into_event_batch( - req: &HttpRequest, body: &Value, schema: HashMap>, static_schema_flag: Option<&String>, @@ -270,12 +260,8 @@ pub fn into_event_batch( schema_version: SchemaVersion, log_source: &LogSource, ) -> Result<(arrow_array::RecordBatch, bool), PostError> { - let tags = collect_labelled_headers(req, PREFIX_TAGS, SEPARATOR)?; - let metadata = collect_labelled_headers(req, PREFIX_META, SEPARATOR)?; let event = format::json::Event { data: body.to_owned(), - tags, - metadata, }; let (rb, is_first) = event.into_recordbatch( &schema, diff --git a/src/handlers/mod.rs b/src/handlers/mod.rs index e18990800..656a71daa 100644 --- a/src/handlers/mod.rs +++ b/src/handlers/mod.rs @@ -20,8 +20,6 @@ pub mod airplane; pub mod http; pub mod livetail; -const PREFIX_TAGS: &str = "x-p-tag-"; -const PREFIX_META: &str = "x-p-meta-"; pub const STREAM_NAME_HEADER_KEY: &str = "x-p-stream"; const LOG_SOURCE_KEY: &str = "x-p-log-source"; const TIME_PARTITION_KEY: &str = "x-p-time-partition"; @@ -29,7 +27,6 @@ const TIME_PARTITION_LIMIT_KEY: &str = "x-p-time-partition-limit"; const CUSTOM_PARTITION_KEY: &str = "x-p-custom-partition"; const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag"; const AUTHORIZATION_KEY: &str = "authorization"; -const SEPARATOR: char = '^'; const UPDATE_STREAM_KEY: &str = "x-p-update-stream"; const STREAM_TYPE_KEY: &str = "x-p-stream-type"; const OIDC_SCOPE: &str = "openid profile email"; diff --git a/src/kafka.rs b/src/kafka.rs index 1ad20fc73..1a379c9ac 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -186,8 +186,6 @@ async fn ingest_message(msg: BorrowedMessage<'_>) -> Result<(), KafkaError> { let schema = resolve_schema(stream_name)?; let event = format::json::Event { data: serde_json::from_slice(payload)?, - tags: String::default(), - metadata: String::default(), }; let time_partition = STREAM_INFO.get_time_partition(stream_name)?; diff --git a/src/query/mod.rs b/src/query/mod.rs index 0ce363584..1c9aac1f9 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -154,7 +154,6 @@ impl Query { /// return logical plan with all time filters applied through fn final_logical_plan(&self, time_partition: &Option) -> LogicalPlan { - let filters = self.filter_tag.clone().and_then(tag_filter); // see https://github.com/apache/arrow-datafusion/pull/8400 // this can be eliminated in later version of datafusion but with slight caveat // transform cannot modify stringified plans by itself @@ -166,7 +165,6 @@ impl Query { plan.plan.as_ref().clone(), self.time_range.start.naive_utc(), self.time_range.end.naive_utc(), - filters, time_partition, ); LogicalPlan::Explain(Explain { @@ -184,7 +182,6 @@ impl Query { x, self.time_range.start.naive_utc(), self.time_range.end.naive_utc(), - filters, time_partition, ) .data @@ -228,21 +225,10 @@ impl TreeNodeVisitor<'_> for TableScanVisitor { } } -fn tag_filter(filters: Vec) -> Option { - filters - .iter() - .map(|literal| { - Expr::Column(Column::from_name(event::DEFAULT_TAGS_KEY)) - .like(lit(format!("%{}%", literal))) - }) - .reduce(or) -} - fn transform( plan: LogicalPlan, start_time: NaiveDateTime, end_time: NaiveDateTime, - filters: Option, time_partition: &Option, ) -> Transformed { plan.transform(&|plan| match plan { @@ -285,10 +271,6 @@ fn transform( new_filters.push(_start_time_filter); new_filters.push(_end_time_filter); } - - if let Some(tag_filters) = filters.clone() { - new_filters.push(tag_filters) - } let new_filter = new_filters.into_iter().reduce(and); if let Some(new_filter) = new_filter { let filter = diff --git a/src/static_schema.rs b/src/static_schema.rs index 62ea258b6..6717175d0 100644 --- a/src/static_schema.rs +++ b/src/static_schema.rs @@ -16,7 +16,7 @@ * */ -use crate::event::{DEFAULT_METADATA_KEY, DEFAULT_TAGS_KEY, DEFAULT_TIMESTAMP_KEY}; +use crate::event::DEFAULT_TIMESTAMP_KEY; use crate::utils::arrow::get_field; use anyhow::{anyhow, Error as AnyError}; use serde::{Deserialize, Serialize}; @@ -144,16 +144,6 @@ fn add_parseable_fields_to_static_schema( let field = Field::new(field.name.clone(), field.data_type.clone(), field.nullable); schema.push(Arc::new(field)); } - if get_field(&schema, DEFAULT_TAGS_KEY).is_some() { - return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY)); - }; - - if get_field(&schema, DEFAULT_METADATA_KEY).is_some() { - return Err(anyhow!( - "field {} is a reserved field", - DEFAULT_METADATA_KEY - )); - }; if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() { return Err(anyhow!( @@ -172,14 +162,6 @@ fn add_parseable_fields_to_static_schema( )), ); - // p_tags and p_metadata are added to the end of the schema - schema.push(Arc::new(Field::new(DEFAULT_TAGS_KEY, DataType::Utf8, true))); - schema.push(Arc::new(Field::new( - DEFAULT_METADATA_KEY, - DataType::Utf8, - true, - ))); - // prepare the record batch and new fields to be added let schema = Arc::new(Schema::new(schema)); Ok(schema) diff --git a/src/utils/header_parsing.rs b/src/utils/header_parsing.rs index 89caf4d9e..4b11ce2d4 100644 --- a/src/utils/header_parsing.rs +++ b/src/utils/header_parsing.rs @@ -16,43 +16,7 @@ * */ -const MAX_HEADERS_ALLOWED: usize = 10; -use actix_web::{HttpRequest, HttpResponse, ResponseError}; - -pub fn collect_labelled_headers( - req: &HttpRequest, - prefix: &str, - kv_separator: char, -) -> Result { - // filter out headers which has right prefix label and convert them into str; - let headers = req.headers().iter().filter_map(|(key, value)| { - let key = key.as_str().strip_prefix(prefix)?; - Some((key, value)) - }); - - let mut labels: Vec = Vec::new(); - - for (key, value) in headers { - let value = value.to_str().map_err(|_| ParseHeaderError::InvalidValue)?; - if key.is_empty() { - return Err(ParseHeaderError::Emptykey); - } - if key.contains(kv_separator) { - return Err(ParseHeaderError::SeperatorInKey(kv_separator)); - } - if value.contains(kv_separator) { - return Err(ParseHeaderError::SeperatorInValue(kv_separator)); - } - - labels.push(format!("{key}={value}")); - } - - if labels.len() > MAX_HEADERS_ALLOWED { - return Err(ParseHeaderError::MaxHeadersLimitExceeded); - } - - Ok(labels.join(&kv_separator.to_string())) -} +use actix_web::{HttpResponse, ResponseError}; #[derive(Debug, thiserror::Error)] pub enum ParseHeaderError {