@@ -43,6 +43,8 @@ use actix_web::Scope;
4343use actix_web:: { web, App , HttpServer } ;
4444use actix_web_prometheus:: PrometheusMetrics ;
4545use async_trait:: async_trait;
46+ use base64:: Engine ;
47+ use itertools:: Itertools ;
4648use relative_path:: RelativePathBuf ;
4749use url:: Url ;
4850
@@ -98,6 +100,7 @@ impl ParseableServer for IngestServer {
98100
99101 /// implement the init method will just invoke the initialize method
100102 async fn init ( & self ) -> anyhow:: Result < ( ) > {
103+ self . validate ( ) ?;
101104 self . initialize ( ) . await
102105 }
103106
@@ -236,26 +239,59 @@ impl IngestServer {
236239 }
237240 }
238241
242+ async fn validate_credentials ( & self ) -> anyhow:: Result < ( ) > {
243+ // check if your creds match with others
244+ let store = CONFIG . storage ( ) . get_object_store ( ) ;
245+ let base_path = RelativePathBuf :: from ( "" ) ;
246+ let ingester_metadata = store
247+ . get_objects ( Some ( & base_path) )
248+ . await ?
249+ . iter ( )
250+ // this unwrap will most definateley shoot me in the foot later
251+ . map ( |x| serde_json:: from_slice :: < IngesterMetadata > ( x) . unwrap_or_default ( ) )
252+ . collect_vec ( ) ;
253+
254+ if !ingester_metadata. is_empty ( ) {
255+ let check = ingester_metadata[ 0 ] . token . clone ( ) ;
256+
257+ let token = base64:: prelude:: BASE64_STANDARD . encode ( format ! (
258+ "{}:{}" ,
259+ CONFIG . parseable. username, CONFIG . parseable. password
260+ ) ) ;
261+
262+ let token = format ! ( "Basic {}" , token) ;
263+
264+ if check != token {
265+ log:: error!( "Credentials do not match with other ingesters. Please check your credentials and try again." ) ;
266+ return Err ( anyhow:: anyhow!( "Credentials do not match with other ingesters. Please check your credentials and try again." ) ) ;
267+ }
268+ }
269+
270+ Ok ( ( ) )
271+ }
272+
239273 async fn initialize ( & self ) -> anyhow:: Result < ( ) > {
240274 // check for querier state. Is it there, or was it there in the past
241275 self . check_querier_state ( ) . await ?;
242276 // to get the .parseable.json file in staging
243- let meta = storage:: resolve_parseable_metadata ( ) . await ?;
244- banner:: print ( & CONFIG , & meta) . await ;
277+ self . validate_credentials ( ) . await ?;
278+
279+ let metadata = storage:: resolve_parseable_metadata ( ) . await ?;
280+ banner:: print ( & CONFIG , & metadata) . await ;
245281
246- rbac:: map:: init ( & meta ) ;
282+ rbac:: map:: init ( & metadata ) ;
247283
248284 // set the info in the global metadata
249- meta . set_global ( ) ;
285+ metadata . set_global ( ) ;
250286
251287 if let Some ( cache_manager) = LocalCacheManager :: global ( ) {
252288 cache_manager
253289 . validate ( CONFIG . parseable . local_cache_size )
254290 . await ?;
255291 } ;
256292
257- let prom = metrics:: build_metrics_handler ( ) ;
258- CONFIG . storage ( ) . register_store_metrics ( & prom ) ;
293+ let prometheus = metrics:: build_metrics_handler ( ) ;
294+ CONFIG . storage ( ) . register_store_metrics ( & prometheus ) ;
259295
260296 let storage = CONFIG . storage ( ) . get_object_store ( ) ;
261297 if let Err ( err) = metadata:: STREAM_INFO . load ( & * storage) . await {
@@ -273,7 +309,7 @@ impl IngestServer {
273309 if CONFIG . parseable . send_analytics {
274310 analytics:: init_analytics_scheduler ( ) ;
275311 }
276- let app = self . start ( prom , CONFIG . parseable . openid . clone ( ) ) ;
312+ let app = self . start ( prometheus , CONFIG . parseable . openid . clone ( ) ) ;
277313 tokio:: pin!( app) ;
278314 loop {
279315 tokio:: select! {
0 commit comments