@@ -37,6 +37,7 @@ use crate::{
3737 } ,
3838} ;
3939use arrow_schema:: { ArrowError , Schema } ;
40+ use base64:: Engine ;
4041use chrono:: { NaiveDateTime , Timelike , Utc } ;
4142use parquet:: {
4243 arrow:: ArrowWriter ,
@@ -300,6 +301,9 @@ pub fn get_ingestor_info() -> anyhow::Result<IngestorMetadata> {
300301
301302 // all the files should be in the staging directory root
302303 let entries = std:: fs:: read_dir ( path) ?;
304+ let url = get_url ( ) ;
305+ let port = url. port ( ) . expect ( "here port should be defined" ) . to_string ( ) ;
306+ let url = url. to_string ( ) ;
303307
304308 for entry in entries {
305309 // cause the staging directory will have only one file with ingestor in the name
@@ -313,15 +317,39 @@ pub fn get_ingestor_info() -> anyhow::Result<IngestorMetadata> {
313317 . contains ( "ingestor" ) ;
314318
315319 if flag {
316- return Ok ( serde_json:: from_slice ( & std:: fs:: read ( path) ?) ?) ;
320+ // get the ingestor metadata from staging
321+ let mut meta: IngestorMetadata = serde_json:: from_slice ( & std:: fs:: read ( path) ?) ?;
322+
323+ // compare url endpoint and port
324+ if meta. domain_name != url {
325+ log:: info!( "Domain Name was Updated. Old: {} New: {}" , meta. domain_name, url) ;
326+ meta. domain_name = url;
327+ }
328+
329+ if meta. port != port {
330+ log:: info!( "Port was Updated. Old: {} New: {}" , meta. port, port) ;
331+ meta. port = port;
332+ }
333+
334+ let token = base64:: prelude:: BASE64_STANDARD . encode ( format ! ( "{}:{}" , CONFIG . parseable. username, CONFIG . parseable. password) ) ;
335+
336+ let token = format ! ( "Basic {}" , token) ;
337+
338+ if meta. token != token {
339+ // TODO: Update the message to be more informative with username and password
340+ log:: info!( "Credentials were Updated. Old: {} New: {}" , meta. token, token) ;
341+ meta. token = token;
342+ }
343+
344+ put_ingestor_info ( meta. clone ( ) ) ?;
345+ return Ok ( meta) ;
317346 }
318347 }
319348
320349 let store = CONFIG . storage ( ) . get_object_store ( ) ;
321- let url = get_url ( ) ;
322350 let out = IngestorMetadata :: new (
323- url . port ( ) . expect ( "here port should be defined" ) . to_string ( ) ,
324- url. to_string ( ) ,
351+ port,
352+ url,
325353 DEFAULT_VERSION . to_string ( ) ,
326354 store. get_bucket_name ( ) ,
327355 & CONFIG . parseable . username ,
@@ -333,6 +361,13 @@ pub fn get_ingestor_info() -> anyhow::Result<IngestorMetadata> {
333361 Ok ( out)
334362}
335363
364+
365+ /// Puts the ingestor info into the staging.
366+ ///
367+ /// This function takes the ingestor info as a parameter and stores it in staging.
368+ /// # Parameters
369+ ///
370+ /// * `ingestor_info`: The ingestor info to be stored.
336371fn put_ingestor_info ( info : IngestorMetadata ) -> anyhow:: Result < ( ) > {
337372 let path = PathBuf :: from ( & CONFIG . parseable . local_staging_path ) ;
338373 let file_name = format ! ( "ingestor.{}.json" , info. ingestor_id) ;
0 commit comments