Skip to content

Commit 13cc00b

Browse files
feat: add feature to provide static schema in logstream creation
set header x-p-static-schema-flag = true and schema in body in below format - { "fields":[ { "name": name of the field", "data_type": "data type of the field, out of these (int, double, float, boolean, string, datetime, string_list, int_list, double_list, float_list, boolean_list)", } ] } once provided, schema is persisted in the storage and in metadata if static schema provided, ingest api will verify if event log schema matches the static schema provided in stream creation if schema does not match, ingestion is rejected
1 parent dc9e0c5 commit 13cc00b

File tree

9 files changed

+331
-27
lines changed

9 files changed

+331
-27
lines changed

server/src/event/format.rs

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,12 @@ pub trait EventFormat: Sized {
4646
fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;
4747
fn into_recordbatch(
4848
self,
49-
schema: HashMap<String, Arc<Field>>,
49+
storage_schema: HashMap<String, Arc<Field>>,
5050
time_partition: Option<String>,
51+
static_schema_flag: Option<String>,
5152
) -> Result<(RecordBatch, bool), AnyError> {
52-
let (data, mut schema, is_first, tags, metadata) = self.to_data(schema, time_partition)?;
53+
let (data, mut schema, is_first, tags, metadata) =
54+
self.to_data(storage_schema.clone(), time_partition)?;
5355

5456
if get_field(&schema, DEFAULT_TAGS_KEY).is_some() {
5557
return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY));
@@ -90,19 +92,50 @@ pub trait EventFormat: Sized {
9092
)));
9193

9294
// prepare the record batch and new fields to be added
93-
let schema = Arc::new(Schema::new(schema));
94-
let rb = Self::decode(data, schema.clone())?;
95+
let new_schema = Arc::new(Schema::new(schema));
96+
if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) {
97+
return Err(anyhow!("Schema mismatch"));
98+
}
99+
let rb = Self::decode(data, new_schema.clone())?;
95100
let tags_arr = StringArray::from_iter_values(std::iter::repeat(&tags).take(rb.num_rows()));
96101
let metadata_arr =
97102
StringArray::from_iter_values(std::iter::repeat(&metadata).take(rb.num_rows()));
98103
// modify the record batch to add fields to respective indexes
99104
let rb = utils::arrow::replace_columns(
100-
Arc::clone(&schema),
105+
Arc::clone(&new_schema),
101106
&rb,
102107
&[tags_index, metadata_index],
103108
&[Arc::new(tags_arr), Arc::new(metadata_arr)],
104109
);
105110

106111
Ok((rb, is_first))
107112
}
113+
114+
fn is_schema_matching(
115+
new_schema: Arc<Schema>,
116+
storage_schema: HashMap<String, Arc<Field>>,
117+
static_schema_flag: Option<String>,
118+
) -> bool {
119+
if static_schema_flag.is_none() {
120+
return true;
121+
}
122+
for (field_name, field) in new_schema
123+
.fields()
124+
.iter()
125+
.map(|field| (field.name().to_owned(), field.clone()))
126+
.collect::<HashMap<String, Arc<Field>>>()
127+
{
128+
if let Some(storage_field) = storage_schema.get(&field_name) {
129+
if field_name != *storage_field.name() {
130+
return false;
131+
}
132+
if field.data_type() != storage_field.data_type() {
133+
return false;
134+
}
135+
} else {
136+
return false;
137+
}
138+
}
139+
true
140+
}
108141
}

server/src/handlers.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ const PREFIX_META: &str = "x-p-meta-";
2424
const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
2525
const LOG_SOURCE_KEY: &str = "x-p-log-source";
2626
const TIME_PARTITION_KEY: &str = "x-p-time-partition";
27+
const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag";
2728
const AUTHORIZATION_KEY: &str = "authorization";
2829
const SEPARATOR: char = '^';
2930

server/src/handlers/http/ingest.rs

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,26 @@
1616
*
1717
*/
1818

19-
use crate::event::error::EventError;
20-
use crate::event::format::EventFormat;
21-
use crate::event::{self, format};
19+
use super::logstream::error::CreateStreamError;
20+
use super::{kinesis, otel};
21+
use crate::event::{
22+
self,
23+
error::EventError,
24+
format::{self, EventFormat},
25+
};
2226
use crate::handlers::{
2327
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL, PREFIX_META, PREFIX_TAGS, SEPARATOR,
2428
STREAM_NAME_HEADER_KEY,
2529
};
2630
use crate::metadata::STREAM_INFO;
2731
use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError};
2832
use actix_web::{http::header::ContentType, HttpRequest, HttpResponse};
29-
use arrow_schema::Field;
33+
use arrow_schema::{Field, Schema};
3034
use bytes::Bytes;
3135
use http::StatusCode;
3236
use serde_json::Value;
3337
use std::collections::{BTreeMap, HashMap};
3438
use std::sync::Arc;
35-
36-
use super::logstream::error::CreateStreamError;
37-
use super::{kinesis, otel};
3839
// Handler for POST /api/v1/ingest
3940
// ingests events by extracting stream name from header
4041
// creates if stream does not exist
@@ -99,14 +100,18 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result
99100
.ok_or(PostError::StreamNotFound(stream_name.clone()))?
100101
.schema
101102
.clone();
102-
103103
let time_partition = hash_map
104104
.get(&stream_name)
105105
.ok_or(PostError::StreamNotFound(stream_name.clone()))?
106106
.time_partition
107107
.clone();
108+
let static_schema_flag = hash_map
109+
.get(&stream_name)
110+
.ok_or(PostError::StreamNotFound(stream_name.clone()))?
111+
.static_schema_flag
112+
.clone();
108113

109-
into_event_batch(req, body, schema, time_partition)?
114+
into_event_batch(req, body, schema, time_partition, static_schema_flag)?
110115
};
111116

112117
event::Event {
@@ -127,6 +132,7 @@ fn into_event_batch(
127132
body: Bytes,
128133
schema: HashMap<String, Arc<Field>>,
129134
time_partition: Option<String>,
135+
static_schema_flag: Option<String>,
130136
) -> Result<(usize, arrow_array::RecordBatch, bool), PostError> {
131137
let tags = collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR)?;
132138
let metadata = collect_labelled_headers(&req, PREFIX_META, SEPARATOR)?;
@@ -137,7 +143,7 @@ fn into_event_batch(
137143
tags,
138144
metadata,
139145
};
140-
let (rb, is_first) = event.into_recordbatch(schema, time_partition)?;
146+
let (rb, is_first) = event.into_recordbatch(schema, time_partition, static_schema_flag)?;
141147
Ok((size, rb, is_first))
142148
}
143149

@@ -146,7 +152,8 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr
146152
if STREAM_INFO.stream_exists(stream_name) {
147153
return Ok(());
148154
}
149-
super::logstream::create_stream(stream_name.to_string(), "").await?;
155+
super::logstream::create_stream(stream_name.to_string(), "", "", Arc::new(Schema::empty()))
156+
.await?;
150157
Ok(())
151158
}
152159

@@ -250,6 +257,7 @@ mod tests {
250257
Bytes::from(serde_json::to_vec(&json).unwrap()),
251258
HashMap::default(),
252259
None,
260+
None,
253261
)
254262
.unwrap();
255263

@@ -297,6 +305,7 @@ mod tests {
297305
Bytes::from(serde_json::to_vec(&json).unwrap()),
298306
HashMap::default(),
299307
None,
308+
None,
300309
)
301310
.unwrap();
302311

@@ -335,6 +344,7 @@ mod tests {
335344
Bytes::from(serde_json::to_vec(&json).unwrap()),
336345
schema,
337346
None,
347+
None,
338348
)
339349
.unwrap();
340350

@@ -372,6 +382,7 @@ mod tests {
372382
req,
373383
Bytes::from(serde_json::to_vec(&json).unwrap()),
374384
schema,
385+
None,
375386
None
376387
)
377388
.is_err());
@@ -397,6 +408,7 @@ mod tests {
397408
Bytes::from(serde_json::to_vec(&json).unwrap()),
398409
schema,
399410
None,
411+
None,
400412
)
401413
.unwrap();
402414

@@ -414,6 +426,7 @@ mod tests {
414426
req,
415427
Bytes::from(serde_json::to_vec(&json).unwrap()),
416428
HashMap::default(),
429+
None,
417430
None
418431
)
419432
.is_err())
@@ -444,6 +457,7 @@ mod tests {
444457
Bytes::from(serde_json::to_vec(&json).unwrap()),
445458
HashMap::default(),
446459
None,
460+
None,
447461
)
448462
.unwrap();
449463

@@ -498,6 +512,7 @@ mod tests {
498512
Bytes::from(serde_json::to_vec(&json).unwrap()),
499513
HashMap::default(),
500514
None,
515+
None,
501516
)
502517
.unwrap();
503518

@@ -552,6 +567,7 @@ mod tests {
552567
Bytes::from(serde_json::to_vec(&json).unwrap()),
553568
schema,
554569
None,
570+
None,
555571
)
556572
.unwrap();
557573

@@ -598,6 +614,7 @@ mod tests {
598614
Bytes::from(serde_json::to_vec(&json).unwrap()),
599615
HashMap::default(),
600616
None,
617+
None,
601618
)
602619
.unwrap();
603620

@@ -648,6 +665,7 @@ mod tests {
648665
req,
649666
Bytes::from(serde_json::to_vec(&json).unwrap()),
650667
schema,
668+
None,
651669
None
652670
)
653671
.is_err());
@@ -683,6 +701,7 @@ mod tests {
683701
Bytes::from(serde_json::to_vec(&json).unwrap()),
684702
HashMap::default(),
685703
None,
704+
None,
686705
)
687706
.unwrap();
688707

0 commit comments

Comments
 (0)