Skip to content

Commit bcc8a2d

Browse files
fix for #605: Decode and flatten logs from kinesis before ingesting on Parseable
1 parent 56c3cc9 commit bcc8a2d

File tree

5 files changed

+75
-44
lines changed

5 files changed

+75
-44
lines changed

server/src/handlers.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,7 @@ const OIDC_SCOPE: &str = "openid profile email";
3131
const COOKIE_AGE_DAYS: usize = 7;
3232
const SESSION_COOKIE_NAME: &str = "session";
3333
const USER_COOKIE_NAME: &str = "username";
34+
35+
// constants for Log Source values for known sources
36+
const LOG_SOURCE_VALUE_FOR_KINEIS: &str = "kinesis";
37+
const LOG_SOURCE_VALUE_FOR_OTEL: &str = "otel";

server/src/handlers/http.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,14 @@ use self::middleware::{DisAllowRootUser, RouteExt};
4040
mod about;
4141
mod health_check;
4242
mod ingest;
43+
mod kinesis;
4344
mod llm;
4445
mod logstream;
4546
mod middleware;
4647
mod oidc;
4748
mod query;
4849
mod rbac;
4950
mod role;
50-
mod kinesis;
5151

5252
include!(concat!(env!("OUT_DIR"), "/generated.rs"));
5353

server/src/handlers/http/ingest.rs

Lines changed: 30 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,26 @@
1616
*
1717
*/
1818

19-
use std::collections::HashMap;
20-
use std::sync::Arc;
21-
22-
use actix_web::http::header::ContentType;
23-
use actix_web::{HttpRequest, HttpResponse};
19+
use actix_web::{http::header::ContentType, HttpRequest, HttpResponse};
2420
use arrow_schema::Field;
2521
use bytes::Bytes;
2622
use http::StatusCode;
2723
use serde_json::Value;
24+
use std::collections::{BTreeMap,HashMap};
25+
use std::sync::Arc;
2826

2927
use crate::event::error::EventError;
3028
use crate::event::format::EventFormat;
3129
use crate::event::{self, format};
32-
use crate::handlers::{PREFIX_META, PREFIX_TAGS, SEPARATOR, STREAM_NAME_HEADER_KEY, LOG_SOURCE_KEY};
30+
use crate::handlers::{
31+
LOG_SOURCE_KEY, PREFIX_META, PREFIX_TAGS, SEPARATOR, STREAM_NAME_HEADER_KEY,
32+
LOG_SOURCE_VALUE_FOR_KINEIS, LOG_SOURCE_VALUE_FOR_OTEL
33+
};
3334
use crate::metadata::STREAM_INFO;
3435
use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError};
3536

36-
use super::logstream::error::CreateStreamError;
3737
use super::kinesis;
38+
use super::logstream::error::CreateStreamError;
3839

3940
// Handler for POST /api/v1/ingest
4041
// ingests events by extracting stream name from header
@@ -48,40 +49,44 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
4849
let stream_name = stream_name.to_str().unwrap().to_owned();
4950
create_stream_if_not_exists(&stream_name).await?;
5051

51-
//section to flatten ingested log data
52-
let cloned_req = req.clone();
53-
flatten_logs(cloned_req, &body);
54-
//section ends
55-
push_logs(stream_name, req, body).await?;
52+
flatten_and_push_logs(req, body, stream_name).await?;
5653
Ok(HttpResponse::Ok().finish())
5754
} else {
5855
Err(PostError::Header(ParseHeaderError::MissingStreamName))
5956
}
6057
}
6158

62-
fn flatten_logs(req: HttpRequest, body: &Bytes){
59+
async fn flatten_and_push_logs(
60+
req: HttpRequest,
61+
body: Bytes,
62+
stream_name: String,
63+
) -> Result<(), PostError> {
6364
//flatten logs
64-
if let Some((_, log_source)) = req
65-
.headers()
66-
.iter()
67-
.find(|&(key, _)| key == LOG_SOURCE_KEY)
68-
{
65+
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY) {
66+
let mut json: Vec<BTreeMap<String, Value>> = Vec::new();
6967
let log_source: String = log_source.to_str().unwrap().to_owned();
70-
//println!("log source: {}", log_source);
71-
match log_source.as_str(){
72-
"kinesis_firehose" => kinesis::flatten_kinesis_logs(body),
73-
_ => {}//do nothing so far
74-
};
68+
match log_source.as_str() {
69+
LOG_SOURCE_VALUE_FOR_KINEIS => json = kinesis::flatten_kinesis_logs(&body),
70+
LOG_SOURCE_VALUE_FOR_OTEL => {}
71+
_ => {}
72+
}
73+
for record in json.iter_mut() {
74+
let body: Bytes = serde_json::to_vec(record).unwrap().into();
75+
push_logs(stream_name.to_string(), req.clone(), body).await?;
76+
}
77+
} else {
78+
push_logs(stream_name.to_string(), req, body).await?;
7579
}
80+
Ok(())
7681
}
7782

78-
7983
// Handler for POST /api/v1/logstream/{logstream}
8084
// only ingests events into the specified logstream
8185
// fails if the logstream does not exist
8286
pub async fn post_event(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostError> {
8387
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
84-
push_logs(stream_name, req, body).await?;
88+
89+
flatten_and_push_logs(req, body, stream_name).await?;
8590
Ok(HttpResponse::Ok().finish())
8691
}
8792

@@ -93,7 +98,6 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result
9398
.ok_or(PostError::StreamNotFound(stream_name.clone()))?
9499
.schema
95100
.clone();
96-
97101
into_event_batch(req, body, schema)?
98102
};
99103

@@ -118,15 +122,12 @@ fn into_event_batch(
118122
let tags = collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR)?;
119123
let metadata = collect_labelled_headers(&req, PREFIX_META, SEPARATOR)?;
120124
let size = body.len();
121-
println!("{:?}", body);
122125
let body: Value = serde_json::from_slice(&body)?;
123126
let event = format::json::Event {
124127
data: body,
125128
tags,
126129
metadata,
127130
};
128-
129-
println!("{:?}", event);
130131
let (rb, is_first) = event.into_recordbatch(schema)?;
131132
Ok((size, rb, is_first))
132133
}
Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,48 @@
1+
use base64::{engine::general_purpose::STANDARD, Engine as _};
12
use bytes::Bytes;
23
use serde::{Deserialize, Serialize};
3-
use base64::{engine::general_purpose::STANDARD, Engine as _};
4+
use serde_json::Value;
5+
use std::collections::BTreeMap;
46
use std::str;
57

6-
78
#[derive(Serialize, Deserialize, Debug)]
89
struct Message {
9-
#[serde(rename = "records_data")]
10-
records_data: Vec<String>,
10+
#[serde(rename = "records")]
11+
records: Vec<Data>,
1112
#[serde(rename = "requestId")]
1213
request_id: String,
13-
timestamp: u64
14+
timestamp: u64,
15+
}
16+
#[derive(Serialize, Deserialize, Debug)]
17+
struct Data {
18+
#[serde(rename = "data")]
19+
data: String,
1420
}
1521

16-
pub fn flatten_kinesis_logs(body: &Bytes){
22+
pub fn flatten_kinesis_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
1723
let body_str = std::str::from_utf8(body).unwrap();
1824
let message: Message = serde_json::from_str(body_str).unwrap();
19-
println!("{:?}", message.records_data);
20-
println!("{:?}", message.request_id);
21-
println!("{:?}", message.timestamp);
25+
let mut vec_kinesis_json: Vec<BTreeMap<String, Value>> = Vec::new();
2226

27+
for record in message.records.iter() {
28+
let bytes = STANDARD.decode(record.data.clone()).unwrap();
29+
let json_string: String = String::from_utf8(bytes).unwrap();
30+
let json: serde_json::Value = serde_json::from_str(&json_string).unwrap();
31+
let mut kinesis_json: BTreeMap<String, Value> = match serde_json::from_value(json) {
32+
Ok(value) => value,
33+
Err(error) => panic!("Failed to deserialize JSON: {}", error),
34+
};
2335

24-
for record in message.records_data.iter() {
25-
let bytes = STANDARD.decode(record).unwrap();
26-
println!("{:?}", String::from_utf8(bytes).unwrap());
36+
kinesis_json.insert(
37+
"requestId".to_owned(),
38+
Value::String(message.request_id.clone()),
39+
);
40+
kinesis_json.insert(
41+
"timestamp".to_owned(),
42+
Value::String(message.timestamp.to_string()),
43+
);
2744

45+
vec_kinesis_json.push(kinesis_json);
2846
}
29-
}
47+
vec_kinesis_json
48+
}

server/src/handlers/http/middleware.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ use actix_web::{
2727
};
2828
use futures_util::future::LocalBoxFuture;
2929

30-
use crate::handlers::{AUTHORIZATION_KEY, KINESIS_COMMON_ATTRIBUTES_KEY, STREAM_NAME_HEADER_KEY};
30+
use crate::handlers::{
31+
AUTHORIZATION_KEY, KINESIS_COMMON_ATTRIBUTES_KEY, LOG_SOURCE_KEY,
32+
STREAM_NAME_HEADER_KEY, LOG_SOURCE_VALUE_FOR_KINEIS
33+
};
3134
use crate::{
3235
option::CONFIG,
3336
rbac::Users,
@@ -149,6 +152,10 @@ where
149152
header::HeaderValue::from_str(&message.common_attributes.x_p_stream.clone())
150153
.unwrap(),
151154
);
155+
req.headers_mut().insert(
156+
HeaderName::from_static(LOG_SOURCE_KEY),
157+
header::HeaderValue::from_static(LOG_SOURCE_VALUE_FOR_KINEIS),
158+
);
152159
}
153160

154161
/* ## Section end */

0 commit comments

Comments
 (0)