Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ use chrono::Utc;
use http::StatusCode;
use serde_json::Value;

use crate::event;
use crate::event::error::EventError;
use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST};
use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry};
use crate::event::{self, FORMAT_KEY, USER_AGENT_KEY};
use crate::handlers::{EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
use crate::metadata::SchemaVersion;
use crate::option::Mode;
Expand Down Expand Up @@ -124,7 +124,9 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
let size: usize = body.len();
let json: Value = serde_json::from_slice(&body)?;
let schema = PARSEABLE.get_stream(&stream_name)?.get_schema_raw();

let mut p_custom_fields = HashMap::new();
p_custom_fields.insert(USER_AGENT_KEY.to_string(), "Parseable".to_string());
p_custom_fields.insert(FORMAT_KEY.to_string(), LogSource::Pmeta.to_string());
// For internal streams, use old schema
format::json::Event::new(json)
.into_event(
Expand All @@ -136,7 +138,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
None,
SchemaVersion::V0,
StreamType::Internal,
&HashMap::new(),
&p_custom_fields,
)?
.process()?;

Expand Down
Loading