Skip to content

Commit 56c3cc9

Browse files
fix for #605: added feature for flattening kinesis logs before ingesting
1 parent 16cb5eb commit 56c3cc9

File tree

4 files changed

+59
-1
lines changed

4 files changed

+59
-1
lines changed

server/src/handlers.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ const PREFIX_META: &str = "x-p-meta-";
2424
const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
2525
const AUTHORIZATION_KEY: &str = "authorization";
2626
const KINESIS_COMMON_ATTRIBUTES_KEY: &str = "x-amz-firehose-common-attributes";
27+
const LOG_SOURCE_KEY: &str = "x-p-log-source";
2728
const SEPARATOR: char = '^';
2829

2930
const OIDC_SCOPE: &str = "openid profile email";

server/src/handlers/http.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ mod oidc;
4747
mod query;
4848
mod rbac;
4949
mod role;
50+
mod kinesis;
5051

5152
include!(concat!(env!("OUT_DIR"), "/generated.rs"));
5253

server/src/handlers/http/ingest.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,12 @@ use serde_json::Value;
2929
use crate::event::error::EventError;
3030
use crate::event::format::EventFormat;
3131
use crate::event::{self, format};
32-
use crate::handlers::{PREFIX_META, PREFIX_TAGS, SEPARATOR, STREAM_NAME_HEADER_KEY};
32+
use crate::handlers::{PREFIX_META, PREFIX_TAGS, SEPARATOR, STREAM_NAME_HEADER_KEY, LOG_SOURCE_KEY};
3333
use crate::metadata::STREAM_INFO;
3434
use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError};
3535

3636
use super::logstream::error::CreateStreamError;
37+
use super::kinesis;
3738

3839
// Handler for POST /api/v1/ingest
3940
// ingests events by extracting stream name from header
@@ -46,13 +47,35 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
4647
{
4748
let stream_name = stream_name.to_str().unwrap().to_owned();
4849
create_stream_if_not_exists(&stream_name).await?;
50+
51+
//section to flatten ingested log data
52+
let cloned_req = req.clone();
53+
flatten_logs(cloned_req, &body);
54+
//section ends
4955
push_logs(stream_name, req, body).await?;
5056
Ok(HttpResponse::Ok().finish())
5157
} else {
5258
Err(PostError::Header(ParseHeaderError::MissingStreamName))
5359
}
5460
}
5561

62+
fn flatten_logs(req: HttpRequest, body: &Bytes){
63+
//flatten logs
64+
if let Some((_, log_source)) = req
65+
.headers()
66+
.iter()
67+
.find(|&(key, _)| key == LOG_SOURCE_KEY)
68+
{
69+
let log_source: String = log_source.to_str().unwrap().to_owned();
70+
//println!("log source: {}", log_source);
71+
match log_source.as_str(){
72+
"kinesis_firehose" => kinesis::flatten_kinesis_logs(body),
73+
_ => {}//do nothing so far
74+
};
75+
}
76+
}
77+
78+
5679
// Handler for POST /api/v1/logstream/{logstream}
5780
// only ingests events into the specified logstream
5881
// fails if the logstream does not exist
@@ -70,6 +93,7 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result
7093
.ok_or(PostError::StreamNotFound(stream_name.clone()))?
7194
.schema
7295
.clone();
96+
7397
into_event_batch(req, body, schema)?
7498
};
7599

@@ -94,12 +118,15 @@ fn into_event_batch(
94118
let tags = collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR)?;
95119
let metadata = collect_labelled_headers(&req, PREFIX_META, SEPARATOR)?;
96120
let size = body.len();
121+
println!("{:?}", body);
97122
let body: Value = serde_json::from_slice(&body)?;
98123
let event = format::json::Event {
99124
data: body,
100125
tags,
101126
metadata,
102127
};
128+
129+
println!("{:?}", event);
103130
let (rb, is_first) = event.into_recordbatch(schema)?;
104131
Ok((size, rb, is_first))
105132
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
use bytes::Bytes;
2+
use serde::{Deserialize, Serialize};
3+
use base64::{engine::general_purpose::STANDARD, Engine as _};
4+
use std::str;
5+
6+
7+
#[derive(Serialize, Deserialize, Debug)]
8+
struct Message {
9+
#[serde(rename = "records_data")]
10+
records_data: Vec<String>,
11+
#[serde(rename = "requestId")]
12+
request_id: String,
13+
timestamp: u64
14+
}
15+
16+
pub fn flatten_kinesis_logs(body: &Bytes){
17+
let body_str = std::str::from_utf8(body).unwrap();
18+
let message: Message = serde_json::from_str(body_str).unwrap();
19+
println!("{:?}", message.records_data);
20+
println!("{:?}", message.request_id);
21+
println!("{:?}", message.timestamp);
22+
23+
24+
for record in message.records_data.iter() {
25+
let bytes = STANDARD.decode(record).unwrap();
26+
println!("{:?}", String::from_utf8(bytes).unwrap());
27+
28+
}
29+
}

0 commit comments

Comments
 (0)