From 1e0534e888313cb65f43dacab1e8e054749aecbd Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 10 Jan 2024 19:19:39 +0530 Subject: [PATCH 1/4] fix for #605: added logic for decoding and flattening kinesis log before ingesting to Parseable --- server/src/handlers.rs | 5 ++ server/src/handlers/http.rs | 1 + server/src/handlers/http/ingest.rs | 44 +++++++++++++---- server/src/handlers/http/kinesis.rs | 66 ++++++++++++++++++++++++++ server/src/handlers/http/middleware.rs | 9 +++- 5 files changed, 116 insertions(+), 9 deletions(-) create mode 100644 server/src/handlers/http/kinesis.rs diff --git a/server/src/handlers.rs b/server/src/handlers.rs index 67ed808ca..352537580 100644 --- a/server/src/handlers.rs +++ b/server/src/handlers.rs @@ -24,9 +24,14 @@ const PREFIX_META: &str = "x-p-meta-"; const STREAM_NAME_HEADER_KEY: &str = "x-p-stream"; const AUTHORIZATION_KEY: &str = "authorization"; const KINESIS_COMMON_ATTRIBUTES_KEY: &str = "x-amz-firehose-common-attributes"; +const LOG_SOURCE_KEY: &str = "x-p-log-source"; const SEPARATOR: char = '^'; const OIDC_SCOPE: &str = "openid profile email"; const COOKIE_AGE_DAYS: usize = 7; const SESSION_COOKIE_NAME: &str = "session"; const USER_COOKIE_NAME: &str = "username"; + +// constants for Log Source values for known sources +const LOG_SOURCE_VALUE_FOR_KINEIS: &str = "kinesis"; +const LOG_SOURCE_VALUE_FOR_OTEL: &str = "otel"; diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index 06fbf1221..9ee796a23 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -40,6 +40,7 @@ use self::middleware::{DisAllowRootUser, RouteExt}; mod about; mod health_check; mod ingest; +mod kinesis; mod llm; mod logstream; mod middleware; diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 5ac41cae2..5d4a00719 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -16,23 +16,25 @@ * */ -use std::collections::HashMap; -use std::sync::Arc; - -use actix_web::http::header::ContentType; -use actix_web::{HttpRequest, HttpResponse}; +use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; use arrow_schema::Field; use bytes::Bytes; use http::StatusCode; use serde_json::Value; +use std::collections::{BTreeMap, HashMap}; +use std::sync::Arc; use crate::event::error::EventError; use crate::event::format::EventFormat; use crate::event::{self, format}; -use crate::handlers::{PREFIX_META, PREFIX_TAGS, SEPARATOR, STREAM_NAME_HEADER_KEY}; +use crate::handlers::{ + LOG_SOURCE_KEY, LOG_SOURCE_VALUE_FOR_KINEIS, LOG_SOURCE_VALUE_FOR_OTEL, PREFIX_META, + PREFIX_TAGS, SEPARATOR, STREAM_NAME_HEADER_KEY, +}; use crate::metadata::STREAM_INFO; use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; +use super::kinesis; use super::logstream::error::CreateStreamError; // Handler for POST /api/v1/ingest @@ -46,19 +48,45 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result Result<(), PostError> { + //flatten logs + if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY) { + let mut json: Vec> = Vec::new(); + let log_source: String = log_source.to_str().unwrap().to_owned(); + match log_source.as_str() { + LOG_SOURCE_VALUE_FOR_KINEIS => json = kinesis::flatten_kinesis_logs(&body), + LOG_SOURCE_VALUE_FOR_OTEL => {} + _ => {} + } + for record in json.iter_mut() { + let body: Bytes = serde_json::to_vec(record).unwrap().into(); + push_logs(stream_name.to_string(), req.clone(), body).await?; + } + } else { + push_logs(stream_name.to_string(), req, body).await?; + } + Ok(()) +} + // Handler for POST /api/v1/logstream/{logstream} // only ingests events into the specified logstream // fails if the logstream does not exist pub async fn post_event(req: HttpRequest, body: Bytes) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - push_logs(stream_name, req, body).await?; + + flatten_and_push_logs(req, body, stream_name).await?; Ok(HttpResponse::Ok().finish()) } diff --git a/server/src/handlers/http/kinesis.rs b/server/src/handlers/http/kinesis.rs new file mode 100644 index 000000000..111548328 --- /dev/null +++ b/server/src/handlers/http/kinesis.rs @@ -0,0 +1,66 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use base64::{engine::general_purpose::STANDARD, Engine as _}; +use bytes::Bytes; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::collections::BTreeMap; +use std::str; + +#[derive(Serialize, Deserialize, Debug)] +struct Message { + #[serde(rename = "records")] + records: Vec, + #[serde(rename = "requestId")] + request_id: String, + timestamp: u64, +} +#[derive(Serialize, Deserialize, Debug)] +struct Data { + #[serde(rename = "data")] + data: String, +} + +pub fn flatten_kinesis_logs(body: &Bytes) -> Vec> { + let body_str = std::str::from_utf8(body).unwrap(); + let message: Message = serde_json::from_str(body_str).unwrap(); + let mut vec_kinesis_json: Vec> = Vec::new(); + + for record in message.records.iter() { + let bytes = STANDARD.decode(record.data.clone()).unwrap(); + let json_string: String = String::from_utf8(bytes).unwrap(); + let json: serde_json::Value = serde_json::from_str(&json_string).unwrap(); + let mut kinesis_json: BTreeMap = match serde_json::from_value(json) { + Ok(value) => value, + Err(error) => panic!("Failed to deserialize JSON: {}", error), + }; + + kinesis_json.insert( + "requestId".to_owned(), + Value::String(message.request_id.clone()), + ); + kinesis_json.insert( + "timestamp".to_owned(), + Value::String(message.timestamp.to_string()), + ); + + vec_kinesis_json.push(kinesis_json); + } + vec_kinesis_json +} diff --git a/server/src/handlers/http/middleware.rs b/server/src/handlers/http/middleware.rs index ffecd7dbb..a5b38628c 100644 --- a/server/src/handlers/http/middleware.rs +++ b/server/src/handlers/http/middleware.rs @@ -27,7 +27,10 @@ use actix_web::{ }; use futures_util::future::LocalBoxFuture; -use crate::handlers::{AUTHORIZATION_KEY, KINESIS_COMMON_ATTRIBUTES_KEY, STREAM_NAME_HEADER_KEY}; +use crate::handlers::{ + AUTHORIZATION_KEY, KINESIS_COMMON_ATTRIBUTES_KEY, LOG_SOURCE_KEY, LOG_SOURCE_VALUE_FOR_KINEIS, + STREAM_NAME_HEADER_KEY, +}; use crate::{ option::CONFIG, rbac::Users, @@ -149,6 +152,10 @@ where header::HeaderValue::from_str(&message.common_attributes.x_p_stream.clone()) .unwrap(), ); + req.headers_mut().insert( + HeaderName::from_static(LOG_SOURCE_KEY), + header::HeaderValue::from_static(LOG_SOURCE_VALUE_FOR_KINEIS), + ); } /* ## Section end */ From c3cc337f5734a061b85e1e6b5890e4e5112ad7d2 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha <131262146+nikhilsinhacloudsurfex@users.noreply.github.com> Date: Wed, 10 Jan 2024 19:22:42 +0530 Subject: [PATCH 2/4] Apply suggestions from code review Co-authored-by: Nitish Tiwari Signed-off-by: Nikhil Sinha <131262146+nikhilsinhacloudsurfex@users.noreply.github.com> --- server/src/handlers.rs | 4 ++-- server/src/handlers/http/ingest.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/handlers.rs b/server/src/handlers.rs index 352537580..c479fab85 100644 --- a/server/src/handlers.rs +++ b/server/src/handlers.rs @@ -33,5 +33,5 @@ const SESSION_COOKIE_NAME: &str = "session"; const USER_COOKIE_NAME: &str = "username"; // constants for Log Source values for known sources -const LOG_SOURCE_VALUE_FOR_KINEIS: &str = "kinesis"; -const LOG_SOURCE_VALUE_FOR_OTEL: &str = "otel"; +const LOG_SOURCE_KINESIS: &str = "kinesis"; +const LOG_SOURCE_OTEL: &str = "otel"; diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 5d4a00719..bf4b8b937 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -28,7 +28,7 @@ use crate::event::error::EventError; use crate::event::format::EventFormat; use crate::event::{self, format}; use crate::handlers::{ - LOG_SOURCE_KEY, LOG_SOURCE_VALUE_FOR_KINEIS, LOG_SOURCE_VALUE_FOR_OTEL, PREFIX_META, + LOG_SOURCE_KEY, LOG_SOURCE_KINEIS, LOG_SOURCE_OTEL, PREFIX_META, PREFIX_TAGS, SEPARATOR, STREAM_NAME_HEADER_KEY, }; use crate::metadata::STREAM_INFO; From 933dbcb1ce3be9a5f7a01eb218b592f7fb2887bd Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 10 Jan 2024 19:40:50 +0530 Subject: [PATCH 3/4] fixed incorrect const values for custom headers used in middleware and ingest.rs --- server/src/handlers/http/ingest.rs | 8 ++++---- server/src/handlers/http/middleware.rs | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index bf4b8b937..bd7bc04d3 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -28,8 +28,8 @@ use crate::event::error::EventError; use crate::event::format::EventFormat; use crate::event::{self, format}; use crate::handlers::{ - LOG_SOURCE_KEY, LOG_SOURCE_KINEIS, LOG_SOURCE_OTEL, PREFIX_META, - PREFIX_TAGS, SEPARATOR, STREAM_NAME_HEADER_KEY, + LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL, PREFIX_META, PREFIX_TAGS, SEPARATOR, + STREAM_NAME_HEADER_KEY, }; use crate::metadata::STREAM_INFO; use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; @@ -66,8 +66,8 @@ async fn flatten_and_push_logs( let mut json: Vec> = Vec::new(); let log_source: String = log_source.to_str().unwrap().to_owned(); match log_source.as_str() { - LOG_SOURCE_VALUE_FOR_KINEIS => json = kinesis::flatten_kinesis_logs(&body), - LOG_SOURCE_VALUE_FOR_OTEL => {} + LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body), + LOG_SOURCE_OTEL => {} _ => {} } for record in json.iter_mut() { diff --git a/server/src/handlers/http/middleware.rs b/server/src/handlers/http/middleware.rs index a5b38628c..8cc5330ba 100644 --- a/server/src/handlers/http/middleware.rs +++ b/server/src/handlers/http/middleware.rs @@ -28,7 +28,7 @@ use actix_web::{ use futures_util::future::LocalBoxFuture; use crate::handlers::{ - AUTHORIZATION_KEY, KINESIS_COMMON_ATTRIBUTES_KEY, LOG_SOURCE_KEY, LOG_SOURCE_VALUE_FOR_KINEIS, + AUTHORIZATION_KEY, KINESIS_COMMON_ATTRIBUTES_KEY, LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, STREAM_NAME_HEADER_KEY, }; use crate::{ @@ -154,7 +154,7 @@ where ); req.headers_mut().insert( HeaderName::from_static(LOG_SOURCE_KEY), - header::HeaderValue::from_static(LOG_SOURCE_VALUE_FOR_KINEIS), + header::HeaderValue::from_static(LOG_SOURCE_KINESIS), ); } From f38052358feefe92eb351bb6189065381e8eebe6 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 11 Jan 2024 14:59:03 +0530 Subject: [PATCH 4/4] added logic for unknown log sources - log a warning and ingest the log --- server/src/handlers/http/ingest.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index bd7bc04d3..be933fe18 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -68,7 +68,10 @@ async fn flatten_and_push_logs( match log_source.as_str() { LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body), LOG_SOURCE_OTEL => {} - _ => {} + _ => { + log::warn!("Unknown log source: {}", log_source); + push_logs(stream_name.to_string(), req.clone(), body).await?; + } } for record in json.iter_mut() { let body: Bytes = serde_json::to_vec(record).unwrap().into();