Skip to content
69 changes: 30 additions & 39 deletions src/parseable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use crate::{
},
metadata::{LogStreamMetadata, SchemaVersion},
option::Mode,
static_schema::{convert_static_schema_to_arrow_schema, StaticSchema},
static_schema::StaticSchema,
storage::{
object_storage::parseable_json_path, ObjectStorageError, ObjectStorageProvider,
ObjectStoreFormat, Owner, Permisssion, StreamType,
Expand Down Expand Up @@ -462,13 +462,35 @@ impl Parseable {
validate_custom_partition(custom_partition)?;
}

let schema = validate_static_schema(
body,
stream_name,
&time_partition,
custom_partition.as_ref(),
static_schema_flag,
)?;
if !time_partition.is_empty() && custom_partition.is_some() {
return Err(StreamError::Custom {
msg: "Cannot set both time partition and custom partition".to_string(),
status: StatusCode::BAD_REQUEST,
});
}

let schema = if static_schema_flag {
if body.is_empty() {
return Err(CreateStreamError::Custom {
msg: format!(
"Please provide schema in the request body for static schema logstream {stream_name}"
),
status: StatusCode::BAD_REQUEST,
}.into());
}

let static_schema: StaticSchema = serde_json::from_slice(body)?;
static_schema
.convert_to_arrow_schema(&time_partition, custom_partition.as_ref())
.map_err(|err| CreateStreamError::Custom {
msg: format!(
"Unable to commit static schema, logstream {stream_name} not created; Error: {err}"
),
status: StatusCode::BAD_REQUEST,
})?
} else {
Arc::new(Schema::empty())
};

self.create_stream(
stream_name.to_string(),
Expand Down Expand Up @@ -761,37 +783,6 @@ impl Parseable {
}
}

pub fn validate_static_schema(
body: &Bytes,
stream_name: &str,
time_partition: &str,
custom_partition: Option<&String>,
static_schema_flag: bool,
) -> Result<Arc<Schema>, CreateStreamError> {
if !static_schema_flag {
return Ok(Arc::new(Schema::empty()));
}

if body.is_empty() {
return Err(CreateStreamError::Custom {
msg: format!(
"Please provide schema in the request body for static schema logstream {stream_name}"
),
status: StatusCode::BAD_REQUEST,
});
}

let static_schema: StaticSchema = serde_json::from_slice(body)?;
let parsed_schema =
convert_static_schema_to_arrow_schema(static_schema, time_partition, custom_partition)
.map_err(|_| CreateStreamError::Custom {
msg: format!("Unable to commit static schema, logstream {stream_name} not created"),
status: StatusCode::BAD_REQUEST,
})?;

Ok(parsed_schema)
}

pub fn validate_time_partition_limit(
time_partition_limit: &str,
) -> Result<NonZeroU32, CreateStreamError> {
Expand Down
286 changes: 99 additions & 187 deletions src/static_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,184 +16,17 @@
*
*/

use crate::event::DEFAULT_TIMESTAMP_KEY;
use crate::utils::arrow::get_field;
use serde::{Deserialize, Serialize};
use std::str;
use std::{collections::HashMap, sync::Arc};

use arrow_schema::{DataType, Field, Schema, TimeUnit};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct StaticSchema {
fields: Vec<SchemaFields>,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SchemaFields {
name: String,
data_type: String,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ParsedSchema {
pub fields: Vec<Fields>,
pub metadata: HashMap<String, String>,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Fields {
name: String,
data_type: DataType,
nullable: bool,
dict_id: i64,
dict_is_ordered: bool,
metadata: HashMap<String, String>,
}

#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Metadata {}
pub fn convert_static_schema_to_arrow_schema(
static_schema: StaticSchema,
time_partition: &str,
custom_partition: Option<&String>,
) -> Result<Arc<Schema>, StaticSchemaError> {
let mut parsed_schema = ParsedSchema {
fields: Vec::new(),
metadata: HashMap::new(),
};
let mut time_partition_exists = false;

if let Some(custom_partition) = custom_partition {
let custom_partition_list = custom_partition.split(',').collect::<Vec<&str>>();
let mut custom_partition_exists = HashMap::with_capacity(custom_partition_list.len());

for partition in &custom_partition_list {
if static_schema
.fields
.iter()
.any(|field| &field.name == partition)
{
custom_partition_exists.insert(partition.to_string(), true);
}
}

for partition in &custom_partition_list {
if !custom_partition_exists.contains_key(*partition) {
return Err(StaticSchemaError::MissingCustomPartition(
partition.to_string(),
));
}
}
}

let mut existing_field_names: HashSet<String> = HashSet::new();

for mut field in static_schema.fields {
validate_field_names(&field.name, &mut existing_field_names)?;
if !time_partition.is_empty() && field.name == time_partition {
time_partition_exists = true;
field.data_type = "datetime".to_string();
}

let parsed_field = Fields {
name: field.name.clone(),

data_type: {
match field.data_type.as_str() {
"int" => DataType::Int64,
"double" | "float" => DataType::Float64,
"boolean" => DataType::Boolean,
"string" => DataType::Utf8,
"datetime" => DataType::Timestamp(TimeUnit::Millisecond, None),
"string_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)))
}
"int_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Int64, true)))
}
"double_list" | "float_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Float64, true)))
}
"boolean_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Boolean, true)))
}
_ => DataType::Null,
}
},
nullable: default_nullable(),
dict_id: default_dict_id(),
dict_is_ordered: default_dict_is_ordered(),
metadata: HashMap::new(),
};

parsed_schema.fields.push(parsed_field);
}
if !time_partition.is_empty() && !time_partition_exists {
return Err(StaticSchemaError::MissingTimePartition(
time_partition.to_string(),
));
}
add_parseable_fields_to_static_schema(parsed_schema)
}

fn add_parseable_fields_to_static_schema(
parsed_schema: ParsedSchema,
) -> Result<Arc<Schema>, StaticSchemaError> {
let mut schema: Vec<Arc<Field>> = Vec::new();
for field in parsed_schema.fields.iter() {
let field = Field::new(field.name.clone(), field.data_type.clone(), field.nullable);
schema.push(Arc::new(field));
}

if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() {
return Err(StaticSchemaError::ReservedKey(DEFAULT_TIMESTAMP_KEY));
};

// add the p_timestamp field to the event schema to the 0th index
schema.insert(
0,
Arc::new(Field::new(
DEFAULT_TIMESTAMP_KEY,
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
)),
);

// prepare the record batch and new fields to be added
let schema = Arc::new(Schema::new(schema));
Ok(schema)
}

fn default_nullable() -> bool {
true
}
fn default_dict_id() -> i64 {
0
}
fn default_dict_is_ordered() -> bool {
false
}
use serde::{Deserialize, Serialize};

fn validate_field_names(
field_name: &str,
existing_fields: &mut HashSet<String>,
) -> Result<(), StaticSchemaError> {
if field_name.is_empty() {
return Err(StaticSchemaError::EmptyFieldName);
}
use crate::event::DEFAULT_TIMESTAMP_KEY;

if !existing_fields.insert(field_name.to_string()) {
return Err(StaticSchemaError::DuplicateField(field_name.to_string()));
}
const DEFAULT_NULLABLE: bool = true;

Ok(())
}
type FieldName = String;
type FieldType = String;

#[derive(Debug, thiserror::Error)]
pub enum StaticSchemaError {
Expand All @@ -213,23 +46,102 @@ pub enum StaticSchemaError {
#[error("field name cannot be empty")]
EmptyFieldName,

#[error("duplicate field name: {0}")]
#[error("field {0:?} is duplicated")]
DuplicateField(String),
}

#[cfg(test)]
mod tests {
use super::*;
#[test]
fn empty_field_names() {
let mut existing_field_names: HashSet<String> = HashSet::new();
assert!(validate_field_names("", &mut existing_field_names).is_err());
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
struct SchemaFields {
name: FieldName,
data_type: FieldType,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct StaticSchema {
fields: Vec<SchemaFields>,
}

impl StaticSchema {
pub fn convert_to_arrow_schema(
self,
time_partition: &str,
custom_partition: Option<&String>,
) -> Result<Arc<Schema>, StaticSchemaError> {
let mut schema: Vec<Arc<Field>> = Vec::new();

// Convert to hashmap for easy access and operation
let mut fields: HashMap<FieldName, FieldType> = HashMap::new();

for field in self.fields {
if field.name.is_empty() {
return Err(StaticSchemaError::EmptyFieldName);
}

if fields.contains_key(&field.name) {
return Err(StaticSchemaError::DuplicateField(field.name.to_owned()));
}

fields.insert(field.name, field.data_type);
}

// Ensures all custom partitions are present in schema
if let Some(custom_partition) = custom_partition {
for partition in custom_partition.split(',') {
if !fields.contains_key(partition) {
return Err(StaticSchemaError::MissingCustomPartition(
partition.to_owned(),
));
}
}
}

// Ensures default timestamp is not mentioned(as it is only inserted by parseable)
if fields.contains_key(DEFAULT_TIMESTAMP_KEY) {
return Err(StaticSchemaError::ReservedKey(DEFAULT_TIMESTAMP_KEY));
}

// If time partitioning is enabled, mutate the datatype to be datetime
if !time_partition.is_empty() {
let Some(field_type) = fields.get_mut(time_partition) else {
return Err(StaticSchemaError::MissingTimePartition(
time_partition.to_owned(),
));
};
*field_type = "datetime".to_string();
}

for (field_name, field_type) in fields {
let data_type = match field_type.as_str() {
"int" => DataType::Int64,
"double" | "float" => DataType::Float64,
"boolean" => DataType::Boolean,
"string" => DataType::Utf8,
"datetime" => DataType::Timestamp(TimeUnit::Millisecond, None),
"string_list" => DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
"int_list" => DataType::List(Arc::new(Field::new("item", DataType::Int64, true))),
"double_list" | "float_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Float64, true)))
}
"boolean_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Boolean, true)))
}
_ => DataType::Null,
};
let field = Field::new(&field_name, data_type, DEFAULT_NULLABLE);
schema.push(Arc::new(field));
}

#[test]
fn duplicate_field_names() {
let mut existing_field_names: HashSet<String> = HashSet::new();
let _ = validate_field_names("test_field", &mut existing_field_names);
assert!(validate_field_names("test_field", &mut existing_field_names).is_err());
// add the p_timestamp field to the event schema to the 0th index
schema.insert(
0,
Arc::new(Field::new(
DEFAULT_TIMESTAMP_KEY,
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
)),
);

// prepare the record batch and new fields to be added
Ok(Arc::new(Schema::new(schema)))
}
}
Loading