|
18 | 18 |
|
19 | 19 | use std::collections::{HashMap, HashSet}; |
20 | 20 |
|
21 | | -use super::logstream::error::{CreateStreamError, StreamError}; |
22 | | -use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs}; |
23 | | -use super::users::dashboards::DashboardError; |
24 | | -use super::users::filters::FiltersError; |
25 | | -use crate::event::format::{self, EventFormat, LogSource}; |
26 | | -use crate::event::{self, error::EventError}; |
27 | | -use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage; |
28 | | -use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY}; |
29 | | -use crate::metadata::error::stream_info::MetadataError; |
30 | | -use crate::metadata::{SchemaVersion, STREAM_INFO}; |
31 | | -use crate::option::{Mode, CONFIG}; |
32 | | -use crate::otel::logs::flatten_otel_logs; |
33 | | -use crate::otel::metrics::flatten_otel_metrics; |
34 | | -use crate::otel::traces::flatten_otel_traces; |
35 | | -use crate::storage::{ObjectStorageError, StreamType}; |
36 | | -use crate::utils::header_parsing::ParseHeaderError; |
37 | | -use crate::utils::json::convert_array_to_object; |
38 | | -use crate::utils::json::flatten::{convert_to_array, JsonFlattenError}; |
39 | 21 | use actix_web::web::{Json, Path}; |
40 | 22 | use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; |
41 | 23 | use arrow_array::RecordBatch; |
| 24 | +use bytes::Bytes; |
42 | 25 | use chrono::Utc; |
43 | 26 | use http::StatusCode; |
44 | 27 | use serde_json::Value; |
45 | 28 |
|
46 | 29 | use crate::event::error::EventError; |
47 | 30 | use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST}; |
48 | | -use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry}; |
| 31 | +use crate::event::format::{LogSource, LogSourceEntry}; |
49 | 32 | use crate::event::{self, FORMAT_KEY, USER_AGENT_KEY}; |
50 | 33 | use crate::handlers::{EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY}; |
51 | | -use crate::metadata::SchemaVersion; |
52 | 34 | use crate::option::Mode; |
53 | 35 | use crate::otel::logs::OTEL_LOG_KNOWN_FIELD_LIST; |
54 | 36 | use crate::otel::metrics::OTEL_METRICS_KNOWN_FIELD_LIST; |
@@ -138,26 +120,11 @@ pub async fn ingest( |
138 | 120 | } |
139 | 121 |
|
140 | 122 | pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> { |
141 | | - let size: usize = body.len(); |
142 | 123 | let json: Value = serde_json::from_slice(&body)?; |
143 | | - let schema = PARSEABLE.get_stream(&stream_name)?.get_schema_raw(); |
144 | 124 | let mut p_custom_fields = HashMap::new(); |
145 | 125 | p_custom_fields.insert(USER_AGENT_KEY.to_string(), "parseable".to_string()); |
146 | 126 | p_custom_fields.insert(FORMAT_KEY.to_string(), LogSource::Pmeta.to_string()); |
147 | | - // For internal streams, use old schema |
148 | | - format::json::Event::new(json) |
149 | | - .into_event( |
150 | | - stream_name, |
151 | | - size as u64, |
152 | | - &schema, |
153 | | - false, |
154 | | - None, |
155 | | - None, |
156 | | - SchemaVersion::V0, |
157 | | - StreamType::Internal, |
158 | | - &p_custom_fields, |
159 | | - )? |
160 | | - .process()?; |
| 127 | + flatten_and_push_logs(json, &stream_name, &LogSource::Pmeta, &p_custom_fields).await?; |
161 | 128 |
|
162 | 129 | Ok(()) |
163 | 130 | } |
|
0 commit comments