Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions server/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@ const PREFIX_META: &str = "x-p-meta-";
const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
const AUTHORIZATION_KEY: &str = "authorization";
const KINESIS_COMMON_ATTRIBUTES_KEY: &str = "x-amz-firehose-common-attributes";
const LOG_SOURCE_KEY: &str = "x-p-log-source";
const SEPARATOR: char = '^';

const OIDC_SCOPE: &str = "openid profile email";
const COOKIE_AGE_DAYS: usize = 7;
const SESSION_COOKIE_NAME: &str = "session";
const USER_COOKIE_NAME: &str = "username";

// constants for Log Source values for known sources
const LOG_SOURCE_KINESIS: &str = "kinesis";
const LOG_SOURCE_OTEL: &str = "otel";
1 change: 1 addition & 0 deletions server/src/handlers/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use self::middleware::{DisAllowRootUser, RouteExt};
mod about;
mod health_check;
mod ingest;
mod kinesis;
mod llm;
mod logstream;
mod middleware;
Expand Down
47 changes: 39 additions & 8 deletions server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,25 @@
*
*/

use std::collections::HashMap;
use std::sync::Arc;

use actix_web::http::header::ContentType;
use actix_web::{HttpRequest, HttpResponse};
use actix_web::{http::header::ContentType, HttpRequest, HttpResponse};
use arrow_schema::Field;
use bytes::Bytes;
use http::StatusCode;
use serde_json::Value;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;

use crate::event::error::EventError;
use crate::event::format::EventFormat;
use crate::event::{self, format};
use crate::handlers::{PREFIX_META, PREFIX_TAGS, SEPARATOR, STREAM_NAME_HEADER_KEY};
use crate::handlers::{
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL, PREFIX_META, PREFIX_TAGS, SEPARATOR,
STREAM_NAME_HEADER_KEY,
};
use crate::metadata::STREAM_INFO;
use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError};

use super::kinesis;
use super::logstream::error::CreateStreamError;

// Handler for POST /api/v1/ingest
Expand All @@ -46,19 +48,48 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
{
let stream_name = stream_name.to_str().unwrap().to_owned();
create_stream_if_not_exists(&stream_name).await?;
push_logs(stream_name, req, body).await?;

flatten_and_push_logs(req, body, stream_name).await?;
Ok(HttpResponse::Ok().finish())
} else {
Err(PostError::Header(ParseHeaderError::MissingStreamName))
}
}

async fn flatten_and_push_logs(
req: HttpRequest,
body: Bytes,
stream_name: String,
) -> Result<(), PostError> {
//flatten logs
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY) {
let mut json: Vec<BTreeMap<String, Value>> = Vec::new();
let log_source: String = log_source.to_str().unwrap().to_owned();
match log_source.as_str() {
LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body),
LOG_SOURCE_OTEL => {}
_ => {
log::warn!("Unknown log source: {}", log_source);
push_logs(stream_name.to_string(), req.clone(), body).await?;
}
}
for record in json.iter_mut() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(stream_name.to_string(), req.clone(), body).await?;
}
} else {
push_logs(stream_name.to_string(), req, body).await?;
}
Ok(())
}

// Handler for POST /api/v1/logstream/{logstream}
// only ingests events into the specified logstream
// fails if the logstream does not exist
pub async fn post_event(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
push_logs(stream_name, req, body).await?;

flatten_and_push_logs(req, body, stream_name).await?;
Ok(HttpResponse::Ok().finish())
}

Expand Down
66 changes: 66 additions & 0 deletions server/src/handlers/http/kinesis.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Parseable Server (C) 2022 - 2023 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use base64::{engine::general_purpose::STANDARD, Engine as _};
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::BTreeMap;
use std::str;

#[derive(Serialize, Deserialize, Debug)]
struct Message {
#[serde(rename = "records")]
records: Vec<Data>,
#[serde(rename = "requestId")]
request_id: String,
timestamp: u64,
}
#[derive(Serialize, Deserialize, Debug)]
struct Data {
#[serde(rename = "data")]
data: String,
}

pub fn flatten_kinesis_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
let body_str = std::str::from_utf8(body).unwrap();
let message: Message = serde_json::from_str(body_str).unwrap();
let mut vec_kinesis_json: Vec<BTreeMap<String, Value>> = Vec::new();

for record in message.records.iter() {
let bytes = STANDARD.decode(record.data.clone()).unwrap();
let json_string: String = String::from_utf8(bytes).unwrap();
let json: serde_json::Value = serde_json::from_str(&json_string).unwrap();
let mut kinesis_json: BTreeMap<String, Value> = match serde_json::from_value(json) {
Ok(value) => value,
Err(error) => panic!("Failed to deserialize JSON: {}", error),
};

kinesis_json.insert(
"requestId".to_owned(),
Value::String(message.request_id.clone()),
);
kinesis_json.insert(
"timestamp".to_owned(),
Value::String(message.timestamp.to_string()),
);

vec_kinesis_json.push(kinesis_json);
}
vec_kinesis_json
}
9 changes: 8 additions & 1 deletion server/src/handlers/http/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ use actix_web::{
};
use futures_util::future::LocalBoxFuture;

use crate::handlers::{AUTHORIZATION_KEY, KINESIS_COMMON_ATTRIBUTES_KEY, STREAM_NAME_HEADER_KEY};
use crate::handlers::{
AUTHORIZATION_KEY, KINESIS_COMMON_ATTRIBUTES_KEY, LOG_SOURCE_KEY, LOG_SOURCE_KINESIS,
STREAM_NAME_HEADER_KEY,
};
use crate::{
option::CONFIG,
rbac::Users,
Expand Down Expand Up @@ -149,6 +152,10 @@ where
header::HeaderValue::from_str(&message.common_attributes.x_p_stream.clone())
.unwrap(),
);
req.headers_mut().insert(
HeaderName::from_static(LOG_SOURCE_KEY),
header::HeaderValue::from_static(LOG_SOURCE_KINESIS),
);
}

/* ## Section end */
Expand Down