Skip to content

Commit aad03fd

Browse files
author
Devdutt Shenoi
authored
Merge branch 'main' into disk-writer
2 parents 58b4200 + db4a68d commit aad03fd

File tree

17 files changed

+660
-168
lines changed

17 files changed

+660
-168
lines changed

src/connectors/kafka/processor.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use tracing::{debug, error};
2828
use crate::{
2929
connectors::common::processor::Processor,
3030
event::{
31-
format::{json, EventFormat, LogSource},
31+
format::{json, EventFormat, LogSourceEntry},
3232
Event as ParseableEvent,
3333
},
3434
parseable::PARSEABLE,
@@ -49,9 +49,14 @@ impl ParseableSinkProcessor {
4949
.first()
5050
.map(|r| r.topic.as_str())
5151
.unwrap_or_default();
52+
let log_source_entry = LogSourceEntry::default();
5253

5354
PARSEABLE
54-
.create_stream_if_not_exists(stream_name, StreamType::UserDefined, LogSource::Json)
55+
.create_stream_if_not_exists(
56+
stream_name,
57+
StreamType::UserDefined,
58+
vec![log_source_entry],
59+
)
5560
.await?;
5661

5762
let stream = PARSEABLE.get_stream(stream_name)?;

src/event/format/json.rs

Lines changed: 105 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use anyhow::anyhow;
2323
use arrow_array::RecordBatch;
2424
use arrow_json::reader::{infer_json_schema_from_iterator, ReaderBuilder};
2525
use arrow_schema::{DataType, Field, Fields, Schema};
26-
use chrono::{DateTime, NaiveDateTime, Utc};
26+
use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc};
2727
use datafusion::arrow::util::bit_util::round_upto_multiple_of_64;
2828
use itertools::Itertools;
2929
use serde_json::Value;
@@ -62,6 +62,7 @@ impl EventFormat for Event {
6262
schema: &HashMap<String, Arc<Field>>,
6363
time_partition: Option<&String>,
6464
schema_version: SchemaVersion,
65+
static_schema_flag: bool,
6566
) -> Result<(Self::Data, Vec<Arc<Field>>, bool), anyhow::Error> {
6667
let stream_schema = schema;
6768

@@ -111,7 +112,7 @@ impl EventFormat for Event {
111112

112113
if value_arr
113114
.iter()
114-
.any(|value| fields_mismatch(&schema, value, schema_version))
115+
.any(|value| fields_mismatch(&schema, value, schema_version, static_schema_flag))
115116
{
116117
return Err(anyhow!(
117118
"Could not process this event due to mismatch in datatype"
@@ -253,73 +254,131 @@ fn collect_keys<'a>(values: impl Iterator<Item = &'a Value>) -> Result<Vec<&'a s
253254
Ok(keys)
254255
}
255256

256-
fn fields_mismatch(schema: &[Arc<Field>], body: &Value, schema_version: SchemaVersion) -> bool {
257+
fn fields_mismatch(
258+
schema: &[Arc<Field>],
259+
body: &Value,
260+
schema_version: SchemaVersion,
261+
static_schema_flag: bool,
262+
) -> bool {
257263
for (name, val) in body.as_object().expect("body is of object variant") {
258264
if val.is_null() {
259265
continue;
260266
}
261267
let Some(field) = get_field(schema, name) else {
262268
return true;
263269
};
264-
if !valid_type(field.data_type(), val, schema_version) {
270+
if !valid_type(field, val, schema_version, static_schema_flag) {
265271
return true;
266272
}
267273
}
268274
false
269275
}
270276

271-
fn valid_type(data_type: &DataType, value: &Value, schema_version: SchemaVersion) -> bool {
272-
match data_type {
277+
fn valid_type(
278+
field: &Field,
279+
value: &Value,
280+
schema_version: SchemaVersion,
281+
static_schema_flag: bool,
282+
) -> bool {
283+
match field.data_type() {
273284
DataType::Boolean => value.is_boolean(),
274-
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => value.is_i64(),
285+
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => {
286+
validate_int(value, static_schema_flag)
287+
}
275288
DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => value.is_u64(),
276289
DataType::Float16 | DataType::Float32 => value.is_f64(),
277-
// All numbers can be cast as Float64 from schema version v1
278-
DataType::Float64 if schema_version == SchemaVersion::V1 => value.is_number(),
279-
DataType::Float64 if schema_version != SchemaVersion::V1 => value.is_f64(),
290+
DataType::Float64 => validate_float(value, schema_version, static_schema_flag),
280291
DataType::Utf8 => value.is_string(),
281-
DataType::List(field) => {
282-
let data_type = field.data_type();
283-
if let Value::Array(arr) = value {
284-
for elem in arr {
285-
if elem.is_null() {
286-
continue;
287-
}
288-
if !valid_type(data_type, elem, schema_version) {
289-
return false;
290-
}
291-
}
292-
}
293-
true
294-
}
292+
DataType::List(field) => validate_list(field, value, schema_version, static_schema_flag),
295293
DataType::Struct(fields) => {
296-
if let Value::Object(val) = value {
297-
for (key, value) in val {
298-
let field = (0..fields.len())
299-
.find(|idx| fields[*idx].name() == key)
300-
.map(|idx| &fields[idx]);
301-
302-
if let Some(field) = field {
303-
if value.is_null() {
304-
continue;
305-
}
306-
if !valid_type(field.data_type(), value, schema_version) {
307-
return false;
308-
}
309-
} else {
310-
return false;
311-
}
312-
}
313-
true
314-
} else {
315-
false
294+
validate_struct(fields, value, schema_version, static_schema_flag)
295+
}
296+
DataType::Date32 => {
297+
if let Value::String(s) = value {
298+
return NaiveDate::parse_from_str(s, "%Y-%m-%d").is_ok();
316299
}
300+
false
317301
}
318302
DataType::Timestamp(_, _) => value.is_string() || value.is_number(),
319303
_ => {
320-
error!("Unsupported datatype {:?}, value {:?}", data_type, value);
321-
unreachable!()
304+
error!(
305+
"Unsupported datatype {:?}, value {:?}",
306+
field.data_type(),
307+
value
308+
);
309+
false
310+
}
311+
}
312+
}
313+
314+
fn validate_int(value: &Value, static_schema_flag: bool) -> bool {
315+
// allow casting string to int for static schema
316+
if static_schema_flag {
317+
if let Value::String(s) = value {
318+
return s.trim().parse::<i64>().is_ok();
319+
}
320+
}
321+
value.is_i64()
322+
}
323+
324+
fn validate_float(value: &Value, schema_version: SchemaVersion, static_schema_flag: bool) -> bool {
325+
// allow casting string to int for static schema
326+
if static_schema_flag {
327+
if let Value::String(s) = value.clone() {
328+
let trimmed = s.trim();
329+
return trimmed.parse::<f64>().is_ok() || trimmed.parse::<i64>().is_ok();
330+
}
331+
return value.is_number();
332+
}
333+
match schema_version {
334+
SchemaVersion::V1 => value.is_number(),
335+
_ => value.is_f64(),
336+
}
337+
}
338+
339+
fn validate_list(
340+
field: &Field,
341+
value: &Value,
342+
schema_version: SchemaVersion,
343+
static_schema_flag: bool,
344+
) -> bool {
345+
if let Value::Array(arr) = value {
346+
for elem in arr {
347+
if elem.is_null() {
348+
continue;
349+
}
350+
if !valid_type(field, elem, schema_version, static_schema_flag) {
351+
return false;
352+
}
353+
}
354+
}
355+
true
356+
}
357+
358+
fn validate_struct(
359+
fields: &Fields,
360+
value: &Value,
361+
schema_version: SchemaVersion,
362+
static_schema_flag: bool,
363+
) -> bool {
364+
if let Value::Object(val) = value {
365+
for (key, value) in val {
366+
let field = fields.iter().find(|f| f.name() == key);
367+
368+
if let Some(field) = field {
369+
if value.is_null() {
370+
continue;
371+
}
372+
if !valid_type(field, value, schema_version, static_schema_flag) {
373+
return false;
374+
}
375+
} else {
376+
return false;
377+
}
322378
}
379+
true
380+
} else {
381+
false
323382
}
324383
}
325384

src/event/format/mod.rs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ static TIME_FIELD_NAME_PARTS: [&str; 2] = ["time", "date"];
4444
type EventSchema = Vec<Arc<Field>>;
4545

4646
/// Source of the logs, used to perform special processing for certain sources
47-
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
47+
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)]
4848
pub enum LogSource {
4949
// AWS Kinesis sends logs in the format of a json array
5050
Kinesis,
@@ -92,6 +92,23 @@ impl Display for LogSource {
9292
}
9393
}
9494

95+
/// Contains the format name and a list of known field names that are associated with the said format.
96+
/// Stored on disk as part of `ObjectStoreFormat` in stream.json
97+
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
98+
pub struct LogSourceEntry {
99+
pub log_source_format: LogSource,
100+
pub fields: HashSet<String>,
101+
}
102+
103+
impl LogSourceEntry {
104+
pub fn new(log_source_format: LogSource, fields: HashSet<String>) -> Self {
105+
LogSourceEntry {
106+
log_source_format,
107+
fields,
108+
}
109+
}
110+
}
111+
95112
// Global Trait for event format
96113
// This trait is implemented by all the event formats
97114
pub trait EventFormat: Sized {
@@ -102,6 +119,7 @@ pub trait EventFormat: Sized {
102119
schema: &HashMap<String, Arc<Field>>,
103120
time_partition: Option<&String>,
104121
schema_version: SchemaVersion,
122+
static_schema_flag: bool,
105123
) -> Result<(Self::Data, EventSchema, bool), AnyError>;
106124

107125
fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;
@@ -117,8 +135,12 @@ pub trait EventFormat: Sized {
117135
schema_version: SchemaVersion,
118136
) -> Result<(RecordBatch, bool), AnyError> {
119137
let p_timestamp = self.get_p_timestamp();
120-
let (data, mut schema, is_first) =
121-
self.to_data(storage_schema, time_partition, schema_version)?;
138+
let (data, mut schema, is_first) = self.to_data(
139+
storage_schema,
140+
time_partition,
141+
schema_version,
142+
static_schema_flag,
143+
)?;
122144

123145
if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() {
124146
return Err(anyhow!(

src/handlers/http/ingest.rs

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*
1717
*/
1818

19-
use std::collections::HashMap;
19+
use std::collections::{HashMap, HashSet};
2020

2121
use actix_web::web::{Json, Path};
2222
use actix_web::{http::header::ContentType, HttpRequest, HttpResponse};
@@ -28,10 +28,13 @@ use serde_json::Value;
2828

2929
use crate::event;
3030
use crate::event::error::EventError;
31-
use crate::event::format::{self, EventFormat, LogSource};
31+
use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry};
3232
use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
3333
use crate::metadata::SchemaVersion;
3434
use crate::option::Mode;
35+
use crate::otel::logs::OTEL_LOG_KNOWN_FIELD_LIST;
36+
use crate::otel::metrics::OTEL_METRICS_KNOWN_FIELD_LIST;
37+
use crate::otel::traces::OTEL_TRACES_KNOWN_FIELD_LIST;
3538
use crate::parseable::{StreamNotFound, PARSEABLE};
3639
use crate::storage::{ObjectStorageError, StreamType};
3740
use crate::utils::header_parsing::ParseHeaderError;
@@ -55,9 +58,6 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
5558
if internal_stream_names.contains(&stream_name) {
5659
return Err(PostError::InternalStream(stream_name));
5760
}
58-
PARSEABLE
59-
.create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::default())
60-
.await?;
6161

6262
let log_source = req
6363
.headers()
@@ -72,6 +72,15 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
7272
return Err(PostError::OtelNotSupported);
7373
}
7474

75+
let log_source_entry = LogSourceEntry::new(log_source.clone(), HashSet::new());
76+
PARSEABLE
77+
.create_stream_if_not_exists(
78+
&stream_name,
79+
StreamType::UserDefined,
80+
vec![log_source_entry],
81+
)
82+
.await?;
83+
7584
flatten_and_push_logs(json, &stream_name, &log_source).await?;
7685

7786
Ok(HttpResponse::Ok().finish())
@@ -119,8 +128,20 @@ pub async fn handle_otel_logs_ingestion(
119128
}
120129

121130
let stream_name = stream_name.to_str().unwrap().to_owned();
131+
132+
let log_source_entry = LogSourceEntry::new(
133+
log_source.clone(),
134+
OTEL_LOG_KNOWN_FIELD_LIST
135+
.iter()
136+
.map(|&s| s.to_string())
137+
.collect(),
138+
);
122139
PARSEABLE
123-
.create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelLogs)
140+
.create_stream_if_not_exists(
141+
&stream_name,
142+
StreamType::UserDefined,
143+
vec![log_source_entry],
144+
)
124145
.await?;
125146

126147
flatten_and_push_logs(json, &stream_name, &log_source).await?;
@@ -146,11 +167,18 @@ pub async fn handle_otel_metrics_ingestion(
146167
return Err(PostError::IncorrectLogSource(LogSource::OtelMetrics));
147168
}
148169
let stream_name = stream_name.to_str().unwrap().to_owned();
170+
let log_source_entry = LogSourceEntry::new(
171+
log_source.clone(),
172+
OTEL_METRICS_KNOWN_FIELD_LIST
173+
.iter()
174+
.map(|&s| s.to_string())
175+
.collect(),
176+
);
149177
PARSEABLE
150178
.create_stream_if_not_exists(
151179
&stream_name,
152180
StreamType::UserDefined,
153-
LogSource::OtelMetrics,
181+
vec![log_source_entry],
154182
)
155183
.await?;
156184

@@ -178,8 +206,20 @@ pub async fn handle_otel_traces_ingestion(
178206
return Err(PostError::IncorrectLogSource(LogSource::OtelTraces));
179207
}
180208
let stream_name = stream_name.to_str().unwrap().to_owned();
209+
let log_source_entry = LogSourceEntry::new(
210+
log_source.clone(),
211+
OTEL_TRACES_KNOWN_FIELD_LIST
212+
.iter()
213+
.map(|&s| s.to_string())
214+
.collect(),
215+
);
216+
181217
PARSEABLE
182-
.create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelTraces)
218+
.create_stream_if_not_exists(
219+
&stream_name,
220+
StreamType::UserDefined,
221+
vec![log_source_entry],
222+
)
183223
.await?;
184224

185225
flatten_and_push_logs(json, &stream_name, &log_source).await?;

0 commit comments

Comments
 (0)