@@ -110,99 +110,115 @@ pub async fn resolve_parseable_metadata(
110110 . as_ref ( )
111111 . map ( |meta| serde_json:: from_slice ( meta) . expect ( "parseable config is valid json" ) ) ;
112112
113- // Env Change needs to be updated
114- let check = determine_environment ( staging_metadata, remote_metadata) ;
115- // flags for if metadata needs to be synced
116- let mut overwrite_staging = false ;
117- let mut overwrite_remote = false ;
118-
119- let res = match check {
120- EnvChange :: None ( metadata) => {
121- // overwrite staging anyways so that it matches remote in case of any divergence
122- overwrite_staging = true ;
123- if PARSEABLE . options . mode == Mode :: All {
124- metadata. server_mode . standalone_after_distributed ( ) ?;
125- }
126- Ok ( metadata)
127- } ,
128- EnvChange :: NewRemote => {
129- Err ( "Could not start the server because staging directory indicates stale data from previous deployment, please choose an empty staging directory and restart the server" )
130- }
131- EnvChange :: NewStaging ( mut metadata) => {
113+ let env_change = determine_environment ( staging_metadata, remote_metadata) ;
132114
133- // if server is started in ingest mode,we need to make sure that query mode has been started
134- // i.e the metadata is updated to reflect the server mode = Query
135- if metadata. server_mode == Mode :: All && PARSEABLE . options . mode == Mode :: Ingest {
136- Err ( "Starting Ingest Mode is not allowed, Since Query Server has not been started yet" )
137- } else {
138- create_dir_all ( PARSEABLE . options . staging_dir ( ) ) ?;
139- metadata. staging = PARSEABLE . options . staging_dir ( ) . canonicalize ( ) ?;
140- // this flag is set to true so that metadata is copied to staging
141- overwrite_staging = true ;
142- // overwrite remote in all and query mode
143- // because staging dir has changed.
144- match PARSEABLE . options . mode {
145- Mode :: All => {
146- metadata. server_mode . standalone_after_distributed ( )
147- . map_err ( |err| {
148- ObjectStorageError :: Custom ( err. to_string ( ) )
149- } ) ?;
150- overwrite_remote = true ;
151- } ,
152- Mode :: Query | Mode :: Prism => {
153- overwrite_remote = true ;
154- metadata. server_mode = PARSEABLE . options . mode ;
155- metadata. staging = PARSEABLE . options . staging_dir ( ) . to_path_buf ( ) ;
156- } ,
157- Mode :: Ingest => {
158- // if ingest server is started fetch the metadata from remote
159- // update the server mode for local metadata
160- metadata. server_mode = PARSEABLE . options . mode ;
161- metadata. staging = PARSEABLE . options . staging_dir ( ) . to_path_buf ( ) ;
162- } ,
163- Mode :: Index => {
164- // if index server is started fetch the metadata from remote
165- // update the server mode for local metadata
166- metadata. server_mode = PARSEABLE . options . mode ;
167- metadata. staging = PARSEABLE . options . staging_dir ( ) . to_path_buf ( ) ;
168- }
169- }
170- Ok ( metadata)
171- }
172- }
173- EnvChange :: CreateBoth => {
174- create_dir_all ( PARSEABLE . options . staging_dir ( ) ) ?;
175- let metadata = StorageMetadata :: default ( ) ;
176- // new metadata needs to be set
177- // if mode is query or all then both staging and remote
178- match PARSEABLE . options . mode {
179- Mode :: All | Mode :: Query | Mode :: Prism => overwrite_remote = true ,
180- _ => ( ) ,
181- }
182- // else only staging
183- overwrite_staging = true ;
184- Ok ( metadata)
185- }
186- } ;
187-
188- let mut metadata = res. map_err ( |err| {
189- let err = format ! ( "{}. {}" , err, JOIN_COMMUNITY ) ;
190- let err: Box < dyn std:: error:: Error + Send + Sync + ' static > = err. into ( ) ;
191- ObjectStorageError :: UnhandledError ( err)
192- } ) ?;
115+ let ( mut metadata, overwrite_staging, overwrite_remote) = process_env_change ( env_change) ?;
193116
194117 metadata. server_mode = PARSEABLE . options . mode ;
118+
195119 if overwrite_remote {
196120 put_remote_metadata ( & metadata) . await ?;
197121 }
198-
199122 if overwrite_staging {
200123 put_staging_metadata ( & metadata) ?;
201124 }
202125
203126 Ok ( metadata)
204127}
205128
129+ fn process_env_change (
130+ env_change : EnvChange ,
131+ ) -> Result < ( StorageMetadata , bool , bool ) , ObjectStorageError > {
132+ match env_change {
133+ EnvChange :: None ( mut metadata) => handle_none_env ( & mut metadata) ,
134+ EnvChange :: NewRemote => handle_new_remote_env ( ) ,
135+ EnvChange :: NewStaging ( mut metadata) => handle_new_staging_env ( & mut metadata) ,
136+ EnvChange :: CreateBoth => handle_create_both_env ( ) ,
137+ }
138+ }
139+
140+ fn handle_none_env (
141+ metadata : & mut StorageMetadata ,
142+ ) -> Result < ( StorageMetadata , bool , bool ) , ObjectStorageError > {
143+ let overwrite_staging = true ;
144+ let mut overwrite_remote = false ;
145+
146+ match PARSEABLE . options . mode {
147+ Mode :: All => {
148+ metadata. server_mode . standalone_after_distributed ( ) ?;
149+ overwrite_remote = true ;
150+ update_metadata_mode_and_staging ( metadata) ;
151+ }
152+ Mode :: Query => {
153+ overwrite_remote = true ;
154+ update_metadata_mode_and_staging ( metadata) ;
155+ }
156+ _ => { }
157+ }
158+ if PARSEABLE . options . mode == Mode :: All {
159+ metadata. server_mode . standalone_after_distributed ( ) ?;
160+ }
161+ Ok ( ( metadata. clone ( ) , overwrite_staging, overwrite_remote) )
162+ }
163+
164+ fn handle_new_remote_env ( ) -> Result < ( StorageMetadata , bool , bool ) , ObjectStorageError > {
165+ Err ( ObjectStorageError :: UnhandledError ( format ! (
166+ "Could not start the server because staging directory indicates stale data from previous deployment, please choose an empty staging directory and restart the server. {}" ,
167+ JOIN_COMMUNITY
168+ ) . into ( ) ) )
169+ }
170+
171+ fn handle_new_staging_env (
172+ metadata : & mut StorageMetadata ,
173+ ) -> Result < ( StorageMetadata , bool , bool ) , ObjectStorageError > {
174+ if metadata. server_mode == Mode :: All && PARSEABLE . options . mode == Mode :: Ingest {
175+ return Err ( ObjectStorageError :: UnhandledError (
176+ format ! (
177+ "Starting Ingest Mode is not allowed, Since Query Server has not been started yet. {}" ,
178+ JOIN_COMMUNITY
179+ )
180+ . into ( ) ,
181+ ) ) ;
182+ }
183+ create_dir_all ( PARSEABLE . options . staging_dir ( ) ) ?;
184+ metadata. staging = PARSEABLE . options . staging_dir ( ) . canonicalize ( ) ?;
185+ let overwrite_staging = true ;
186+ let mut overwrite_remote = false ;
187+
188+ match PARSEABLE . options . mode {
189+ Mode :: All => {
190+ metadata
191+ . server_mode
192+ . standalone_after_distributed ( )
193+ . map_err ( |err| ObjectStorageError :: Custom ( err. to_string ( ) ) ) ?;
194+ overwrite_remote = true ;
195+ }
196+ Mode :: Query | Mode :: Prism | Mode :: Ingest | Mode :: Index => {
197+ update_metadata_mode_and_staging ( metadata) ;
198+ if matches ! ( PARSEABLE . options. mode, Mode :: Query | Mode :: Prism ) {
199+ overwrite_remote = true ;
200+ }
201+ }
202+ }
203+ Ok ( ( metadata. clone ( ) , overwrite_staging, overwrite_remote) )
204+ }
205+
206+ fn handle_create_both_env ( ) -> Result < ( StorageMetadata , bool , bool ) , ObjectStorageError > {
207+ create_dir_all ( PARSEABLE . options . staging_dir ( ) ) ?;
208+ let metadata = StorageMetadata :: default ( ) ;
209+ let overwrite_remote = matches ! (
210+ PARSEABLE . options. mode,
211+ Mode :: All | Mode :: Query | Mode :: Prism
212+ ) ;
213+ let overwrite_staging = true ;
214+ Ok ( ( metadata, overwrite_staging, overwrite_remote) )
215+ }
216+
217+ fn update_metadata_mode_and_staging ( metadata : & mut StorageMetadata ) {
218+ metadata. server_mode = PARSEABLE . options . mode ;
219+ metadata. staging = PARSEABLE . options . staging_dir ( ) . to_path_buf ( ) ;
220+ }
221+
206222pub fn determine_environment (
207223 staging_metadata : Option < StorageMetadata > ,
208224 remote_metadata : Option < StorageMetadata > ,
0 commit comments