Skip to content

Commit a44b530

Browse files
authored
Use Arrow Schema instead of custom schema type. (#94)
Replace custom schema type with arrow schema wrapped in Option. If metadata was able to load a schema then it will be of Some variant, in case there is no schema ( new stream ) then the None variant will be used. This reduces number of times serialisation to schema happens and this schema will be directly passed on to future event processing and queries.
1 parent d83f099 commit a44b530

File tree

6 files changed

+89
-113
lines changed

6 files changed

+89
-113
lines changed

server/src/event.rs

Lines changed: 19 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*
1818
*/
1919
use datafusion::arrow;
20+
use datafusion::arrow::datatypes::Schema;
2021
use datafusion::arrow::json;
2122
use datafusion::arrow::json::reader::infer_json_schema;
2223
use datafusion::arrow::record_batch::RecordBatch;
@@ -41,10 +42,6 @@ pub struct Event {
4142
}
4243

4344
// Events holds the schema related to a each event for a single log stream
44-
pub struct Schema {
45-
pub arrow_schema: arrow::datatypes::Schema,
46-
pub string_schema: String,
47-
}
4845

4946
impl Event {
5047
fn data_file_path(&self) -> String {
@@ -62,31 +59,29 @@ impl Event {
6259
&self,
6360
storage: &impl ObjectStorage,
6461
) -> Result<response::EventResponse, Error> {
65-
let Schema {
66-
arrow_schema,
67-
string_schema,
68-
} = self.infer_schema().map_err(|e| {
62+
let inferred_schema = self.infer_schema().map_err(|e| {
6963
error!("Failed to infer schema for event. {:?}", e);
7064
e
7165
})?;
7266

73-
let event = self.get_reader(arrow_schema);
67+
let event = self.get_reader(inferred_schema.clone());
7468
let size = self.body_size();
7569

7670
let stream_schema = metadata::STREAM_INFO.schema(&self.stream_name)?;
77-
let is_first_event = stream_schema.is_empty();
78-
// if stream schema is empty then it is first event.
79-
let compressed_size = if is_first_event {
80-
// process first event and store schema in obect store
81-
self.process_first_event(event, string_schema.clone(), storage)
82-
.await?
83-
} else {
71+
let is_first_event = stream_schema.is_none();
72+
73+
let compressed_size = if let Some(existing_schema) = stream_schema {
8474
// validate schema before processing the event
85-
if stream_schema != string_schema {
75+
if existing_schema != inferred_schema {
8676
return Err(Error::SchemaMismatch(self.stream_name.clone()));
8777
} else {
8878
self.process_event(event)?
8979
}
80+
} else {
81+
// if stream schema is none then it is first event,
82+
// process first event and store schema in obect store
83+
self.process_first_event(event, inferred_schema, storage)
84+
.await?
9085
};
9186

9287
if let Err(e) = metadata::STREAM_INFO.update_stats(&self.stream_name, size, compressed_size)
@@ -116,7 +111,7 @@ impl Event {
116111
async fn process_first_event<R: std::io::Read>(
117112
&self,
118113
mut event: json::Reader<R>,
119-
string_schema: String,
114+
schema: Schema,
120115
storage: &impl ObjectStorage,
121116
) -> Result<u64, Error> {
122117
let rb = event.next()?.ok_or(Error::MissingRecord)?;
@@ -126,8 +121,9 @@ impl Event {
126121

127122
// Put the inferred schema to object store
128123
let stream_name = &self.stream_name;
124+
129125
storage
130-
.put_schema(stream_name.clone(), string_schema.clone())
126+
.put_schema(stream_name.clone(), &schema)
131127
.await
132128
.map_err(|e| response::EventError {
133129
msg: format!(
@@ -138,7 +134,7 @@ impl Event {
138134

139135
// set the schema in memory for this stream
140136
metadata::STREAM_INFO
141-
.set_schema(self.stream_name.clone(), string_schema)
137+
.set_schema(&self.stream_name, schema)
142138
.map_err(|e| response::EventError {
143139
msg: format!(
144140
"Failed to set schema for log stream {} due to err: {}",
@@ -180,12 +176,8 @@ impl Event {
180176
let reader = self.body.as_bytes();
181177
let mut buf_reader = BufReader::new(reader);
182178
let inferred_schema = infer_json_schema(&mut buf_reader, None)?;
183-
let str_inferred_schema = serde_json::to_string(&inferred_schema)?;
184179

185-
Ok(Schema {
186-
arrow_schema: inferred_schema,
187-
string_schema: str_inferred_schema,
188-
})
180+
Ok(inferred_schema)
189181
}
190182

191183
fn get_reader(&self, arrow_schema: arrow::datatypes::Schema) -> json::Reader<&[u8]> {
@@ -206,11 +198,8 @@ impl Event {
206198
let parquet_path = self.data_file_path();
207199
let parquet_file = fs::File::create(&parquet_path)?;
208200
let props = WriterProperties::builder().build();
209-
let mut writer = ArrowWriter::try_new(
210-
parquet_file,
211-
Arc::new(self.infer_schema()?.arrow_schema),
212-
Some(props),
213-
)?;
201+
let mut writer =
202+
ArrowWriter::try_new(parquet_file, Arc::new(self.infer_schema()?), Some(props))?;
214203
writer.write(&rb)?;
215204
writer.close()?;
216205

server/src/handlers/logstream.rs

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -92,25 +92,24 @@ pub async fn schema(req: HttpRequest) -> HttpResponse {
9292

9393
match metadata::STREAM_INFO.schema(&stream_name) {
9494
Ok(schema) => response::ServerResponse {
95-
msg: schema,
95+
msg: schema
96+
.map(|schema| schema.to_json().to_string())
97+
.unwrap_or_default(),
9698
code: StatusCode::OK,
9799
}
98100
.to_http(),
99101
Err(_) => match S3::new().get_schema(&stream_name).await {
100-
Ok(schema) if schema.is_empty() => response::ServerResponse {
102+
Ok(None) => response::ServerResponse {
101103
msg: "log stream is not initialized, please post an event before fetching schema"
102104
.to_string(),
103105
code: StatusCode::BAD_REQUEST,
104106
}
105107
.to_http(),
106-
Ok(schema) => {
107-
let buf = schema.as_ref();
108-
response::ServerResponse {
109-
msg: String::from_utf8(buf.to_vec()).unwrap(),
110-
code: StatusCode::OK,
111-
}
112-
.to_http()
108+
Ok(Some(schema)) => response::ServerResponse {
109+
msg: serde_json::from_value(schema.to_json()).unwrap(),
110+
code: StatusCode::OK,
113111
}
112+
.to_http(),
114113
Err(_) => response::ServerResponse {
115114
msg: "failed to get log stream schema, because log stream doesn't exist"
116115
.to_string(),
@@ -124,7 +123,7 @@ pub async fn schema(req: HttpRequest) -> HttpResponse {
124123
pub async fn get_alert(req: HttpRequest) -> HttpResponse {
125124
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
126125

127-
match metadata::STREAM_INFO.alert(stream_name.clone()) {
126+
match metadata::STREAM_INFO.alert(&stream_name) {
128127
Ok(alerts) => response::ServerResponse {
129128
msg: serde_json::to_string(&alerts).unwrap(),
130129
code: StatusCode::OK,
@@ -166,11 +165,9 @@ pub async fn put(req: HttpRequest) -> HttpResponse {
166165

167166
// Proceed to create log stream if it doesn't exist
168167
if s3.get_schema(&stream_name).await.is_err() {
169-
if let Err(e) = metadata::STREAM_INFO.add_stream(
170-
stream_name.to_string(),
171-
"".to_string(),
172-
Default::default(),
173-
) {
168+
if let Err(e) =
169+
metadata::STREAM_INFO.add_stream(stream_name.to_string(), None, Default::default())
170+
{
174171
return response::ServerResponse {
175172
msg: format!(
176173
"failed to create log stream {} due to error: {}",
@@ -249,7 +246,7 @@ pub async fn put_alert(req: HttpRequest, body: web::Json<serde_json::Value>) ->
249246
.to_http();
250247
}
251248

252-
if let Err(e) = metadata::STREAM_INFO.set_alert(stream_name.to_string(), alerts) {
249+
if let Err(e) = metadata::STREAM_INFO.set_alert(&stream_name, alerts) {
253250
return response::ServerResponse {
254251
msg: format!(
255252
"failed to set alert configuration for log stream {} due to err: {}",

server/src/metadata.rs

Lines changed: 37 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*
1717
*/
1818

19-
use bytes::Bytes;
19+
use datafusion::arrow::datatypes::Schema;
2020
use lazy_static::lazy_static;
2121
use log::error;
2222
use serde::{Deserialize, Serialize};
@@ -30,7 +30,7 @@ use crate::storage::ObjectStorage;
3030

3131
#[derive(Debug, Default, Clone, PartialEq, Eq)]
3232
pub struct LogStreamMetadata {
33-
pub schema: String,
33+
pub schema: Option<Schema>,
3434
pub alerts: Alerts,
3535
pub stats: Stats,
3636
}
@@ -85,38 +85,42 @@ impl STREAM_INFO {
8585
Ok(())
8686
}
8787

88-
pub fn set_schema(&self, stream_name: String, schema: String) -> Result<(), Error> {
89-
let alerts = self.alert(stream_name.clone())?;
90-
self.add_stream(stream_name, schema, alerts)
88+
pub fn set_schema(&self, stream_name: &str, schema: Schema) -> Result<(), Error> {
89+
let mut map = self.write().unwrap();
90+
map.get_mut(stream_name)
91+
.ok_or(Error::StreamMetaNotFound(stream_name.to_string()))
92+
.map(|metadata| {
93+
metadata.schema.replace(schema);
94+
})
9195
}
9296

93-
pub fn schema(&self, stream_name: &str) -> Result<String, Error> {
97+
pub fn schema(&self, stream_name: &str) -> Result<Option<Schema>, Error> {
9498
let map = self.read().unwrap();
95-
let meta = map
96-
.get(stream_name)
97-
.ok_or(Error::StreamMetaNotFound(stream_name.to_string()))?;
98-
99-
Ok(meta.schema.clone())
99+
map.get(stream_name)
100+
.ok_or(Error::StreamMetaNotFound(stream_name.to_string()))
101+
.map(|metadata| metadata.schema.to_owned())
100102
}
101103

102-
pub fn set_alert(&self, stream_name: String, alerts: Alerts) -> Result<(), Error> {
103-
let schema = self.schema(&stream_name)?;
104-
self.add_stream(stream_name, schema, alerts)
104+
pub fn set_alert(&self, stream_name: &str, alerts: Alerts) -> Result<(), Error> {
105+
let mut map = self.write().unwrap();
106+
map.get_mut(stream_name)
107+
.ok_or(Error::StreamMetaNotFound(stream_name.to_string()))
108+
.map(|metadata| {
109+
metadata.alerts = alerts;
110+
})
105111
}
106112

107-
pub fn alert(&self, stream_name: String) -> Result<Alerts, Error> {
113+
pub fn alert(&self, stream_name: &str) -> Result<Alerts, Error> {
108114
let map = self.read().unwrap();
109-
let meta = map
110-
.get(&stream_name)
111-
.ok_or(Error::StreamMetaNotFound(stream_name.to_owned()))?;
112-
113-
Ok(meta.alerts.clone())
115+
map.get(stream_name)
116+
.ok_or(Error::StreamMetaNotFound(stream_name.to_owned()))
117+
.map(|metadata| metadata.alerts.to_owned())
114118
}
115119

116120
pub fn add_stream(
117121
&self,
118122
stream_name: String,
119-
schema: String,
123+
schema: Option<Schema>,
120124
alerts: Alerts,
121125
) -> Result<(), Error> {
122126
let mut map = self.write().unwrap();
@@ -153,8 +157,6 @@ impl STREAM_INFO {
153157
let schema = storage
154158
.get_schema(&stream.name)
155159
.await
156-
.map_err(|e| e.into())
157-
.and_then(parse_string)
158160
.map_err(|_| Error::SchemaNotInStore(stream.name.to_owned()))?;
159161

160162
let metadata = LogStreamMetadata {
@@ -191,17 +193,21 @@ impl STREAM_INFO {
191193
}
192194
}
193195

194-
fn parse_string(bytes: Bytes) -> Result<String, Error> {
195-
String::from_utf8(bytes.to_vec()).map_err(|e| e.into())
196-
}
197-
198196
#[cfg(test)]
199197
mod tests {
200198
use super::*;
199+
use datafusion::arrow::datatypes::{DataType, Field};
201200
use maplit::hashmap;
202201
use rstest::*;
203202
use serial_test::serial;
204203

204+
#[fixture]
205+
fn schema() -> Schema {
206+
let field_a = Field::new("a", DataType::Int64, false);
207+
let field_b = Field::new("b", DataType::Boolean, false);
208+
Schema::new(vec![field_a, field_b])
209+
}
210+
205211
#[rstest]
206212
#[case::zero(0, 0, 0)]
207213
#[case::some(1024, 512, 2048)]
@@ -229,25 +235,10 @@ mod tests {
229235
}
230236

231237
#[rstest]
232-
#[case::nonempty_string("Hello world")]
233-
#[case::empty_string("")]
234-
fn test_parse_string(#[case] string: String) {
235-
let bytes = Bytes::from(string);
236-
assert!(parse_string(bytes).is_ok())
237-
}
238-
239-
#[test]
240-
fn test_bad_parse_string() {
241-
let bad: Vec<u8> = vec![195, 40];
242-
let bytes = Bytes::from(bad);
243-
assert!(parse_string(bytes).is_err());
244-
}
245-
246-
#[rstest]
247-
#[case::stream_schema_alert("teststream", "schema")]
248-
#[case::stream_only("teststream", "")]
238+
#[case::stream_schema_alert("teststream", Some(schema()))]
239+
#[case::stream_only("teststream", None)]
249240
#[serial]
250-
fn test_add_stream(#[case] stream_name: String, #[case] schema: String) {
241+
fn test_add_stream(#[case] stream_name: String, #[case] schema: Option<Schema>) {
251242
let alerts = Alerts { alerts: vec![] };
252243
clear_map();
253244
STREAM_INFO
@@ -271,11 +262,7 @@ mod tests {
271262
fn test_delete_stream(#[case] stream_name: String) {
272263
clear_map();
273264
STREAM_INFO
274-
.add_stream(
275-
stream_name.clone(),
276-
"".to_string(),
277-
Alerts { alerts: vec![] },
278-
)
265+
.add_stream(stream_name.clone(), None, Alerts { alerts: vec![] })
279266
.unwrap();
280267

281268
STREAM_INFO.delete_stream(&stream_name).unwrap();

server/src/query.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
*/
1818

1919
use chrono::{DateTime, Utc};
20-
use datafusion::arrow::datatypes::Schema;
2120
use datafusion::arrow::record_batch::RecordBatch;
2221
use datafusion::datasource::file_format::parquet::ParquetFormat;
2322
use datafusion::datasource::listing::ListingOptions;
@@ -94,13 +93,12 @@ impl Query {
9493
target_partitions: 1,
9594
};
9695

97-
let schema = &STREAM_INFO.schema(&self.stream_name)?;
96+
let schema = STREAM_INFO.schema(&self.stream_name)?;
9897

99-
if schema.is_empty() {
100-
return Ok(());
101-
}
102-
103-
let schema: Arc<Schema> = Arc::new(serde_json::from_str(schema)?);
98+
let schema = match schema {
99+
Some(schema) => Arc::new(schema),
100+
None => return Ok(()),
101+
};
104102

105103
ctx.register_listing_table(
106104
&self.stream_name,

0 commit comments

Comments
 (0)