@@ -45,7 +45,6 @@ use crate::{handlers::http::base_path, option::CONFIG};
4545use actix_web:: web;
4646use actix_web:: web:: resource;
4747use actix_web:: Scope ;
48- use anyhow:: anyhow;
4948use async_trait:: async_trait;
5049use base64:: Engine ;
5150use bytes:: Bytes ;
@@ -85,14 +84,14 @@ impl ParseableServer for IngestServer {
8584 // parseable can't use local storage for persistence when running a distributed setup
8685 if CONFIG . get_storage_mode_string ( ) == "Local drive" {
8786 return Err ( anyhow:: Error :: msg (
88- "This instance of the Parseable server has been configured to run in a distributed setup, it doesn't support local storage." ,
89- ) ) ;
87+ "This instance of the Parseable server has been configured to run in a distributed setup, it doesn't support local storage." ,
88+ ) ) ;
9089 }
9190
9291 // check for querier state. Is it there, or was it there in the past
93- let parseable_json = self . check_querier_state ( ) . await ?;
92+ let parseable_json = check_querier_state ( ) . await ?;
9493 // to get the .parseable.json file in staging
95- self . validate_credentials ( ) . await ?;
94+ validate_credentials ( ) . await ?;
9695
9796 Ok ( parseable_json)
9897 }
@@ -112,7 +111,7 @@ impl ParseableServer for IngestServer {
112111 tokio:: spawn ( airplane:: server ( ) ) ;
113112
114113 // set the ingestor metadata
115- self . set_ingestor_metadata ( ) . await ?;
114+ set_ingestor_metadata ( ) . await ?;
116115
117116 // Ingestors shouldn't have to deal with OpenId auth flow
118117 let app = self . start ( prometheus, None ) ;
@@ -278,96 +277,92 @@ impl IngestServer {
278277 ) ,
279278 )
280279 }
280+ }
281+
282+ // create the ingestor metadata and put the .ingestor.json file in the object store
283+ pub async fn set_ingestor_metadata ( ) -> anyhow:: Result < ( ) > {
284+ let storage_ingestor_metadata = migrate_ingester_metadata ( ) . await ?;
285+ let store = CONFIG . storage ( ) . get_object_store ( ) ;
286+
287+ // find the meta file in staging if not generate new metadata
288+ let resource = INGESTOR_META . clone ( ) ;
289+ // use the id that was generated/found in the staging and
290+ // generate the path for the object store
291+ let path = ingestor_metadata_path ( None ) ;
292+
293+ // we are considering that we can always get from object store
294+ if let Some ( mut store_data) = storage_ingestor_metadata {
295+ if store_data. domain_name != INGESTOR_META . domain_name {
296+ store_data
297+ . domain_name
298+ . clone_from ( & INGESTOR_META . domain_name ) ;
299+ store_data. port . clone_from ( & INGESTOR_META . port ) ;
281300
282- // create the ingestor metadata and put the .ingestor.json file in the object store
283- async fn set_ingestor_metadata ( & self ) -> anyhow:: Result < ( ) > {
284- let storage_ingestor_metadata = migrate_ingester_metadata ( ) . await ?;
285- let store = CONFIG . storage ( ) . get_object_store ( ) ;
286-
287- // find the meta file in staging if not generate new metadata
288- let resource = INGESTOR_META . clone ( ) ;
289- // use the id that was generated/found in the staging and
290- // generate the path for the object store
291- let path = ingestor_metadata_path ( None ) ;
292-
293- // we are considering that we can always get from object store
294- if storage_ingestor_metadata. is_some ( ) {
295- let mut store_data = storage_ingestor_metadata. unwrap ( ) ;
296-
297- if store_data. domain_name != INGESTOR_META . domain_name {
298- store_data
299- . domain_name
300- . clone_from ( & INGESTOR_META . domain_name ) ;
301- store_data. port . clone_from ( & INGESTOR_META . port ) ;
302-
303- let resource = Bytes :: from ( serde_json:: to_vec ( & store_data) ?) ;
304-
305- // if pushing to object store fails propagate the error
306- return store
307- . put_object ( & path, resource)
308- . await
309- . map_err ( |err| anyhow ! ( err) ) ;
310- }
311- } else {
312- let resource = Bytes :: from ( serde_json:: to_vec ( & resource) ?) ;
301+ let resource = Bytes :: from ( serde_json:: to_vec ( & store_data) ?) ;
313302
303+ // if pushing to object store fails propagate the error
314304 store. put_object ( & path, resource) . await ?;
315305 }
306+ } else {
307+ let resource = Bytes :: from ( serde_json:: to_vec ( & resource) ?) ;
316308
317- Ok ( ( ) )
309+ store . put_object ( & path , resource ) . await ? ;
318310 }
319311
320- // check for querier state. Is it there, or was it there in the past
321- // this should happen before the set the ingestor metadata
322- async fn check_querier_state ( & self ) -> anyhow:: Result < Option < Bytes > , ObjectStorageError > {
323- // how do we check for querier state?
324- // based on the work flow of the system, the querier will always need to start first
325- // i.e the querier will create the `.parseable.json` file
326-
327- let store = CONFIG . storage ( ) . get_object_store ( ) ;
328- let path = parseable_json_path ( ) ;
312+ Ok ( ( ) )
313+ }
329314
330- let parseable_json = store. get_object ( & path) . await ;
331- match parseable_json {
332- Ok ( _) => Ok ( Some ( parseable_json. unwrap ( ) ) ) ,
333- Err ( _) => Err ( ObjectStorageError :: Custom (
315+ // check for querier state. Is it there, or was it there in the past
316+ // this should happen before the set the ingestor metadata
317+ async fn check_querier_state ( ) -> anyhow:: Result < Option < Bytes > , ObjectStorageError > {
318+ // how do we check for querier state?
319+ // based on the work flow of the system, the querier will always need to start first
320+ // i.e the querier will create the `.parseable.json` file
321+ let parseable_json = CONFIG
322+ . storage ( )
323+ . get_object_store ( )
324+ . get_object ( & parseable_json_path ( ) )
325+ . await
326+ . map_err ( |_| {
327+ ObjectStorageError :: Custom (
334328 "Query Server has not been started yet. Please start the querier server first."
335329 . to_string ( ) ,
336- ) ) ,
337- }
338- }
339-
340- async fn validate_credentials ( & self ) -> anyhow:: Result < ( ) > {
341- // check if your creds match with others
342- let store = CONFIG . storage ( ) . get_object_store ( ) ;
343- let base_path = RelativePathBuf :: from ( PARSEABLE_ROOT_DIRECTORY ) ;
344- let ingestor_metadata = store
345- . get_objects (
346- Some ( & base_path) ,
347- Box :: new ( |file_name| file_name. starts_with ( "ingestor" ) ) ,
348330 )
349- . await ?;
350- if !ingestor_metadata. is_empty ( ) {
351- let ingestor_metadata_value: Value =
352- serde_json:: from_slice ( & ingestor_metadata[ 0 ] ) . expect ( "ingestor.json is valid json" ) ;
353- let check = ingestor_metadata_value
354- . as_object ( )
355- . and_then ( |meta| meta. get ( "token" ) )
356- . and_then ( |token| token. as_str ( ) )
357- . unwrap ( ) ;
358-
359- let token = base64:: prelude:: BASE64_STANDARD . encode ( format ! (
360- "{}:{}" ,
361- CONFIG . parseable. username, CONFIG . parseable. password
362- ) ) ;
363-
364- let token = format ! ( "Basic {}" , token) ;
365-
366- if check != token {
367- return Err ( anyhow:: anyhow!( "Credentials do not match with other ingestors. Please check your credentials and try again." ) ) ;
368- }
369- }
331+ } ) ?;
332+
333+ Ok ( Some ( parseable_json) )
334+ }
370335
371- Ok ( ( ) )
336+ async fn validate_credentials ( ) -> anyhow:: Result < ( ) > {
337+ // check if your creds match with others
338+ let store = CONFIG . storage ( ) . get_object_store ( ) ;
339+ let base_path = RelativePathBuf :: from ( PARSEABLE_ROOT_DIRECTORY ) ;
340+ let ingestor_metadata = store
341+ . get_objects (
342+ Some ( & base_path) ,
343+ Box :: new ( |file_name| file_name. starts_with ( "ingestor" ) ) ,
344+ )
345+ . await ?;
346+ if !ingestor_metadata. is_empty ( ) {
347+ let ingestor_metadata_value: Value =
348+ serde_json:: from_slice ( & ingestor_metadata[ 0 ] ) . expect ( "ingestor.json is valid json" ) ;
349+ let check = ingestor_metadata_value
350+ . as_object ( )
351+ . and_then ( |meta| meta. get ( "token" ) )
352+ . and_then ( |token| token. as_str ( ) )
353+ . unwrap ( ) ;
354+
355+ let token = base64:: prelude:: BASE64_STANDARD . encode ( format ! (
356+ "{}:{}" ,
357+ CONFIG . parseable. username, CONFIG . parseable. password
358+ ) ) ;
359+
360+ let token = format ! ( "Basic {}" , token) ;
361+
362+ if check != token {
363+ return Err ( anyhow:: anyhow!( "Credentials do not match with other ingestors. Please check your credentials and try again." ) ) ;
364+ }
372365 }
366+
367+ Ok ( ( ) )
373368}
0 commit comments