@@ -102,7 +102,7 @@ pub trait EventFormat: Sized {
102102 if !Self :: is_schema_matching ( new_schema. clone ( ) , storage_schema, static_schema_flag) {
103103 return Err ( anyhow ! ( "Schema mismatch" ) ) ;
104104 }
105- new_schema = update_field_type_in_schema ( new_schema, time_partition) ;
105+ new_schema = update_field_type_in_schema ( new_schema, None , time_partition, None ) ;
106106 let rb = Self :: decode ( data, new_schema. clone ( ) ) ?;
107107 let tags_arr = StringArray :: from_iter_values ( std:: iter:: repeat ( & tags) . take ( rb. num_rows ( ) ) ) ;
108108 let metadata_arr =
@@ -147,19 +147,101 @@ pub trait EventFormat: Sized {
147147 }
148148}
149149
150+ pub fn get_existing_fields (
151+ inferred_schema : Arc < Schema > ,
152+ existing_schema : Option < & HashMap < String , Arc < Field > > > ,
153+ ) -> Vec < Arc < Field > > {
154+ let mut existing_fields = Vec :: new ( ) ;
155+
156+ for field in inferred_schema. fields . iter ( ) {
157+ if existing_schema. map_or ( false , |schema| schema. contains_key ( field. name ( ) ) ) {
158+ existing_fields. push ( field. clone ( ) ) ;
159+ }
160+ }
161+
162+ existing_fields
163+ }
164+
165+ pub fn get_existing_timestamp_fields (
166+ existing_schema : & HashMap < String , Arc < Field > > ,
167+ ) -> Vec < Arc < Field > > {
168+ let mut timestamp_fields = Vec :: new ( ) ;
169+
170+ for field in existing_schema. values ( ) {
171+ if let DataType :: Timestamp ( TimeUnit :: Millisecond , None ) = field. data_type ( ) {
172+ timestamp_fields. push ( field. clone ( ) ) ;
173+ }
174+ }
175+
176+ timestamp_fields
177+ }
178+
179+ pub fn override_timestamp_fields (
180+ inferred_schema : Arc < Schema > ,
181+ existing_timestamp_fields : & [ Arc < Field > ] ,
182+ ) -> Arc < Schema > {
183+ let timestamp_field_names: Vec < & str > = existing_timestamp_fields
184+ . iter ( )
185+ . map ( |field| field. name ( ) . as_str ( ) )
186+ . collect ( ) ;
187+
188+ let updated_fields: Vec < Arc < Field > > = inferred_schema
189+ . fields ( )
190+ . iter ( )
191+ . map ( |field| {
192+ if timestamp_field_names. contains ( & field. name ( ) . as_str ( ) ) {
193+ Arc :: new ( Field :: new (
194+ field. name ( ) ,
195+ DataType :: Timestamp ( TimeUnit :: Millisecond , None ) ,
196+ field. is_nullable ( ) ,
197+ ) )
198+ } else {
199+ field. clone ( )
200+ }
201+ } )
202+ . collect ( ) ;
203+
204+ Arc :: new ( Schema :: new ( updated_fields) )
205+ }
206+
150207pub fn update_field_type_in_schema (
151- schema : Arc < Schema > ,
208+ inferred_schema : Arc < Schema > ,
209+ existing_schema : Option < & HashMap < String , Arc < Field > > > ,
152210 time_partition : Option < String > ,
211+ log_records : Option < & Vec < Value > > ,
153212) -> Arc < Schema > {
213+ let mut updated_schema = inferred_schema. clone ( ) ;
214+
215+ if let Some ( existing_schema) = existing_schema {
216+ let existing_fields = get_existing_fields ( inferred_schema. clone ( ) , Some ( existing_schema) ) ;
217+ let existing_timestamp_fields = get_existing_timestamp_fields ( existing_schema) ;
218+ // overriding known timestamp fields which were inferred as string fields
219+ updated_schema = override_timestamp_fields ( updated_schema, & existing_timestamp_fields) ;
220+ let existing_field_names: Vec < String > = existing_fields
221+ . iter ( )
222+ . map ( |field| field. name ( ) . clone ( ) )
223+ . collect ( ) ;
224+
225+ if let Some ( log_records) = log_records {
226+ for log_record in log_records {
227+ updated_schema = Arc :: new ( update_data_type_to_datetime (
228+ ( * updated_schema) . clone ( ) ,
229+ log_record. clone ( ) ,
230+ existing_field_names. clone ( ) ,
231+ ) ) ;
232+ }
233+ }
234+ }
235+
154236 if time_partition. is_none ( ) {
155- return schema ;
237+ return updated_schema ;
156238 }
157- let field_name = time_partition. unwrap ( ) ;
158- let new_schema: Vec < Field > = schema
239+ let time_partition_field_name = time_partition. unwrap ( ) ;
240+ let new_schema: Vec < Field > = updated_schema
159241 . fields ( )
160242 . iter ( )
161243 . map ( |field| {
162- if * field. name ( ) == field_name {
244+ if * field. name ( ) == time_partition_field_name {
163245 if field. data_type ( ) == & DataType :: Utf8 {
164246 let new_data_type = DataType :: Timestamp ( TimeUnit :: Millisecond , None ) ;
165247 Field :: new ( field. name ( ) . clone ( ) , new_data_type, true )
@@ -174,12 +256,16 @@ pub fn update_field_type_in_schema(
174256 Arc :: new ( Schema :: new ( new_schema) )
175257}
176258
177- pub fn update_data_type_to_datetime ( schema : Schema , value : Value ) -> Schema {
259+ pub fn update_data_type_to_datetime (
260+ schema : Schema ,
261+ value : Value ,
262+ ignore_field_names : Vec < String > ,
263+ ) -> Schema {
178264 let new_schema: Vec < Field > = schema
179265 . fields ( )
180266 . iter ( )
181267 . map ( |field| {
182- if field. data_type ( ) == & DataType :: Utf8 {
268+ if field. data_type ( ) == & DataType :: Utf8 && !ignore_field_names . contains ( field . name ( ) ) {
183269 if let Value :: Object ( map) = & value {
184270 if let Some ( Value :: String ( s) ) = map. get ( field. name ( ) ) {
185271 if DateTime :: parse_from_rfc3339 ( s) . is_ok ( ) {
0 commit comments