@@ -21,13 +21,15 @@ use super::{
2121 ObjectStoreFormat , Permisssion , StorageDir , StorageMetadata ,
2222} ;
2323use super :: {
24- ALERT_FILE_NAME , CORRELATIONS_ROOT_DIRECTORY , MANIFEST_FILE , PARSEABLE_METADATA_FILE_NAME ,
25- PARSEABLE_ROOT_DIRECTORY , SCHEMA_FILE_NAME , STREAM_METADATA_FILE_NAME , STREAM_ROOT_DIRECTORY ,
24+ Owner , ALERT_FILE_NAME , CORRELATIONS_ROOT_DIRECTORY , MANIFEST_FILE ,
25+ PARSEABLE_METADATA_FILE_NAME , PARSEABLE_ROOT_DIRECTORY , SCHEMA_FILE_NAME ,
26+ STREAM_METADATA_FILE_NAME , STREAM_ROOT_DIRECTORY ,
2627} ;
2728
2829use crate :: correlation:: { CorrelationConfig , CorrelationError } ;
2930use crate :: handlers:: http:: modal:: ingest_server:: INGESTOR_META ;
3031use crate :: handlers:: http:: users:: { DASHBOARDS_DIR , FILTER_DIR , USERS_ROOT_DIR } ;
32+ use crate :: metadata:: SchemaVersion ;
3133use crate :: metrics:: { EVENTS_STORAGE_SIZE_DATE , LIFETIME_EVENTS_STORAGE_SIZE } ;
3234use crate :: option:: Mode ;
3335use crate :: {
@@ -153,28 +155,22 @@ pub trait ObjectStorage: Send + Sync + 'static {
153155 schema : Arc < Schema > ,
154156 stream_type : & str ,
155157 ) -> Result < String , ObjectStorageError > {
156- let mut format = ObjectStoreFormat :: default ( ) ;
157- format. set_id ( CONFIG . parseable . username . clone ( ) ) ;
158- let permission = Permisssion :: new ( CONFIG . parseable . username . clone ( ) ) ;
159- format. permissions = vec ! [ permission] ;
160- format. created_at = Local :: now ( ) . to_rfc3339 ( ) ;
161- format. stream_type = Some ( stream_type. to_string ( ) ) ;
162- if time_partition. is_empty ( ) {
163- format. time_partition = None ;
164- } else {
165- format. time_partition = Some ( time_partition. to_string ( ) ) ;
166- }
167- format. time_partition_limit = time_partition_limit. map ( |limit| limit. to_string ( ) ) ;
168- if custom_partition. is_empty ( ) {
169- format. custom_partition = None ;
170- } else {
171- format. custom_partition = Some ( custom_partition. to_string ( ) ) ;
172- }
173- if static_schema_flag != "true" {
174- format. static_schema_flag = None ;
175- } else {
176- format. static_schema_flag = Some ( static_schema_flag. to_string ( ) ) ;
177- }
158+ let format = ObjectStoreFormat {
159+ created_at : Local :: now ( ) . to_rfc3339 ( ) ,
160+ permissions : vec ! [ Permisssion :: new( CONFIG . parseable. username. clone( ) ) ] ,
161+ stream_type : Some ( stream_type. to_string ( ) ) ,
162+ time_partition : ( !time_partition. is_empty ( ) ) . then ( || time_partition. to_string ( ) ) ,
163+ time_partition_limit : time_partition_limit. map ( |limit| limit. to_string ( ) ) ,
164+ custom_partition : ( !custom_partition. is_empty ( ) ) . then ( || custom_partition. to_string ( ) ) ,
165+ static_schema_flag : ( static_schema_flag == "true" )
166+ . then ( || static_schema_flag. to_string ( ) ) ,
167+ schema_version : SchemaVersion :: V1 , // NOTE: Newly created streams are all V1
168+ owner : Owner {
169+ id : CONFIG . parseable . username . clone ( ) ,
170+ group : CONFIG . parseable . username . clone ( ) ,
171+ } ,
172+ ..Default :: default ( )
173+ } ;
178174 let format_json = to_bytes ( & format) ;
179175 self . put_object ( & schema_path ( stream_name) , to_bytes ( & schema) )
180176 . await ?;
0 commit comments