1818
1919use crate :: event:: DEFAULT_TIMESTAMP_KEY ;
2020use crate :: utils:: arrow:: get_field;
21- use anyhow:: { anyhow, Error as AnyError } ;
2221use serde:: { Deserialize , Serialize } ;
2322use std:: str;
2423
@@ -27,6 +26,7 @@ use std::{
2726 collections:: { HashMap , HashSet } ,
2827 sync:: Arc ,
2928} ;
29+
3030#[ derive( Debug , Clone , PartialEq , Serialize , Deserialize ) ]
3131pub struct StaticSchema {
3232 fields : Vec < SchemaFields > ,
@@ -57,13 +57,12 @@ pub struct Fields {
5757}
5858
5959#[ derive( Default , Debug , Clone , PartialEq , Serialize , Deserialize ) ]
60-
6160pub struct Metadata { }
6261pub fn convert_static_schema_to_arrow_schema (
6362 static_schema : StaticSchema ,
6463 time_partition : & str ,
6564 custom_partition : Option < & String > ,
66- ) -> Result < Arc < Schema > , AnyError > {
65+ ) -> Result < Arc < Schema > , StaticSchemaError > {
6766 let mut parsed_schema = ParsedSchema {
6867 fields : Vec :: new ( ) ,
6968 metadata : HashMap :: new ( ) ,
@@ -86,7 +85,9 @@ pub fn convert_static_schema_to_arrow_schema(
8685
8786 for partition in & custom_partition_list {
8887 if !custom_partition_exists. contains_key ( * partition) {
89- return Err ( anyhow ! ( "custom partition field {partition} does not exist in the schema for the static schema logstream" ) ) ;
88+ return Err ( StaticSchemaError :: MissingCustomPartition (
89+ partition. to_string ( ) ,
90+ ) ) ;
9091 }
9192 }
9293 }
@@ -134,29 +135,24 @@ pub fn convert_static_schema_to_arrow_schema(
134135 parsed_schema. fields . push ( parsed_field) ;
135136 }
136137 if !time_partition. is_empty ( ) && !time_partition_exists {
137- return Err ( anyhow ! {
138- format!(
139- "time partition field {time_partition} does not exist in the schema for the static schema logstream"
140- ) ,
141- } ) ;
138+ return Err ( StaticSchemaError :: MissingTimePartition (
139+ time_partition. to_string ( ) ,
140+ ) ) ;
142141 }
143142 add_parseable_fields_to_static_schema ( parsed_schema)
144143}
145144
146145fn add_parseable_fields_to_static_schema (
147146 parsed_schema : ParsedSchema ,
148- ) -> Result < Arc < Schema > , AnyError > {
147+ ) -> Result < Arc < Schema > , StaticSchemaError > {
149148 let mut schema: Vec < Arc < Field > > = Vec :: new ( ) ;
150149 for field in parsed_schema. fields . iter ( ) {
151150 let field = Field :: new ( field. name . clone ( ) , field. data_type . clone ( ) , field. nullable ) ;
152151 schema. push ( Arc :: new ( field) ) ;
153152 }
154153
155154 if get_field ( & schema, DEFAULT_TIMESTAMP_KEY ) . is_some ( ) {
156- return Err ( anyhow ! (
157- "field {} is a reserved field" ,
158- DEFAULT_TIMESTAMP_KEY
159- ) ) ;
155+ return Err ( StaticSchemaError :: DefaultTime ) ;
160156 } ;
161157
162158 // add the p_timestamp field to the event schema to the 0th index
@@ -187,22 +183,43 @@ fn default_dict_is_ordered() -> bool {
187183fn validate_field_names (
188184 field_name : & str ,
189185 existing_fields : & mut HashSet < String > ,
190- ) -> Result < ( ) , AnyError > {
186+ ) -> Result < ( ) , StaticSchemaError > {
191187 if field_name. is_empty ( ) {
192- return Err ( anyhow ! ( "field names should not be empty" ) ) ;
188+ return Err ( StaticSchemaError :: EmptyFieldName ) ;
193189 }
194190
195191 if !existing_fields. insert ( field_name. to_string ( ) ) {
196- return Err ( anyhow ! ( "duplicate field name: {}" , field_name) ) ;
192+ return Err ( StaticSchemaError :: DuplicateField ( field_name. to_string ( ) ) ) ;
197193 }
198194
199195 Ok ( ( ) )
200196}
201197
198+ #[ derive( Debug , thiserror:: Error ) ]
199+ pub enum StaticSchemaError {
200+ #[ error(
201+ "custom partition field {0} does not exist in the schema for the static schema logstream"
202+ ) ]
203+ MissingCustomPartition ( String ) ,
204+
205+ #[ error(
206+ "time partition field {0} does not exist in the schema for the static schema logstream"
207+ ) ]
208+ MissingTimePartition ( String ) ,
209+
210+ #[ error( "field {DEFAULT_TIMESTAMP_KEY:?} is a reserved field" ) ]
211+ ReservedKey ( & ' static str ) ,
212+
213+ #[ error( "field name cannot be empty" ) ]
214+ EmptyFieldName ,
215+
216+ #[ error( "duplicate field name: {0}" ) ]
217+ DuplicateField ( String ) ,
218+ }
219+
202220#[ cfg( test) ]
203221mod tests {
204222 use super :: * ;
205- use std:: collections:: HashSet ;
206223 #[ test]
207224 fn empty_field_names ( ) {
208225 let mut existing_field_names: HashSet < String > = HashSet :: new ( ) ;
0 commit comments