@@ -37,6 +37,7 @@ use crate::{
3737 } ,
3838} ;
3939use arrow_schema:: { ArrowError , Schema } ;
40+ use base64:: Engine ;
4041use chrono:: { NaiveDateTime , Timelike , Utc } ;
4142use parquet:: {
4243 arrow:: ArrowWriter ,
@@ -300,6 +301,9 @@ pub fn get_ingestor_info() -> anyhow::Result<IngestorMetadata> {
300301
301302 // all the files should be in the staging directory root
302303 let entries = std:: fs:: read_dir ( path) ?;
304+ let url = get_url ( ) ;
305+ let port = url. port ( ) . expect ( "here port should be defined" ) . to_string ( ) ;
306+ let url = url. to_string ( ) ;
303307
304308 for entry in entries {
305309 // cause the staging directory will have only one file with ingestor in the name
@@ -313,15 +317,50 @@ pub fn get_ingestor_info() -> anyhow::Result<IngestorMetadata> {
313317 . contains ( "ingestor" ) ;
314318
315319 if flag {
316- return Ok ( serde_json:: from_slice ( & std:: fs:: read ( path) ?) ?) ;
320+ // get the ingestor metadata from staging
321+ let mut meta: IngestorMetadata = serde_json:: from_slice ( & std:: fs:: read ( path) ?) ?;
322+
323+ // compare url endpoint and port
324+ if meta. domain_name != url {
325+ log:: info!(
326+ "Domain Name was Updated. Old: {} New: {}" ,
327+ meta. domain_name,
328+ url
329+ ) ;
330+ meta. domain_name = url;
331+ }
332+
333+ if meta. port != port {
334+ log:: info!( "Port was Updated. Old: {} New: {}" , meta. port, port) ;
335+ meta. port = port;
336+ }
337+
338+ let token = base64:: prelude:: BASE64_STANDARD . encode ( format ! (
339+ "{}:{}" ,
340+ CONFIG . parseable. username, CONFIG . parseable. password
341+ ) ) ;
342+
343+ let token = format ! ( "Basic {}" , token) ;
344+
345+ if meta. token != token {
346+ // TODO: Update the message to be more informative with username and password
347+ log:: info!(
348+ "Credentials were Updated. Old: {} New: {}" ,
349+ meta. token,
350+ token
351+ ) ;
352+ meta. token = token;
353+ }
354+
355+ put_ingestor_info ( meta. clone ( ) ) ?;
356+ return Ok ( meta) ;
317357 }
318358 }
319359
320360 let store = CONFIG . storage ( ) . get_object_store ( ) ;
321- let url = get_url ( ) ;
322361 let out = IngestorMetadata :: new (
323- url . port ( ) . expect ( "here port should be defined" ) . to_string ( ) ,
324- url. to_string ( ) ,
362+ port,
363+ url,
325364 DEFAULT_VERSION . to_string ( ) ,
326365 store. get_bucket_name ( ) ,
327366 & CONFIG . parseable . username ,
@@ -333,6 +372,12 @@ pub fn get_ingestor_info() -> anyhow::Result<IngestorMetadata> {
333372 Ok ( out)
334373}
335374
375+ /// Puts the ingestor info into the staging.
376+ ///
377+ /// This function takes the ingestor info as a parameter and stores it in staging.
378+ /// # Parameters
379+ ///
380+ /// * `ingestor_info`: The ingestor info to be stored.
336381fn put_ingestor_info ( info : IngestorMetadata ) -> anyhow:: Result < ( ) > {
337382 let path = PathBuf :: from ( & CONFIG . parseable . local_staging_path ) ;
338383 let file_name = format ! ( "ingestor.{}.json" , info. ingestor_id) ;
0 commit comments