@@ -26,6 +26,7 @@ use crate::handlers::http::logstream::error::StreamError;
2626use crate :: option:: CONFIG ;
2727
2828use crate :: metrics:: prom_utils:: Metrics ;
29+ use crate :: rbac:: user:: User ;
2930use crate :: stats:: Stats ;
3031use crate :: storage:: object_storage:: ingestor_metadata_path;
3132use crate :: storage:: { ObjectStorageError , STREAM_ROOT_DIRECTORY } ;
@@ -39,13 +40,15 @@ use itertools::Itertools;
3940use relative_path:: RelativePathBuf ;
4041use serde:: de:: Error ;
4142use serde_json:: error:: Error as SerdeError ;
42- use serde_json:: Value as JsonValue ;
43+ use serde_json:: { to_vec , Value as JsonValue } ;
4344use url:: Url ;
4445type IngestorMetadataArr = Vec < IngestorMetadata > ;
4546
4647use self :: utils:: StorageStats ;
4748
4849use super :: base_path_without_preceding_slash;
50+ use super :: rbac:: RBACError ;
51+ use std:: collections:: HashSet ;
4952use std:: time:: Duration ;
5053
5154use super :: modal:: IngestorMetadata ;
@@ -94,7 +97,7 @@ pub async fn sync_cache_with_ingestors(
9497 Ok ( ( ) )
9598}
9699
97- // forward the request to all ingestors to keep them in sync
100+ // forward the create/update stream request to all ingestors to keep them in sync
98101pub async fn sync_streams_with_ingestors (
99102 headers : HeaderMap ,
100103 body : Bytes ,
@@ -142,7 +145,218 @@ pub async fn sync_streams_with_ingestors(
142145 log:: error!(
143146 "failed to forward upsert stream request to ingestor: {}\n Response Returned: {:?}" ,
144147 ingestor. domain_name,
145- res
148+ res. text( ) . await
149+ ) ;
150+ }
151+ }
152+
153+ Ok ( ( ) )
154+ }
155+
156+ // forward the role update request to all ingestors to keep them in sync
157+ pub async fn sync_users_with_roles_with_ingestors (
158+ username : & String ,
159+ role : & HashSet < String > ,
160+ ) -> Result < ( ) , RBACError > {
161+ let ingestor_infos = get_ingestor_info ( ) . await . map_err ( |err| {
162+ log:: error!( "Fatal: failed to get ingestor info: {:?}" , err) ;
163+ RBACError :: Anyhow ( err)
164+ } ) ?;
165+
166+ let client = reqwest:: Client :: new ( ) ;
167+ let role = to_vec ( & role. clone ( ) ) . map_err ( |err| {
168+ log:: error!( "Fatal: failed to serialize role: {:?}" , err) ;
169+ RBACError :: SerdeError ( err)
170+ } ) ?;
171+ for ingestor in ingestor_infos. iter ( ) {
172+ if !utils:: check_liveness ( & ingestor. domain_name ) . await {
173+ log:: warn!( "Ingestor {} is not live" , ingestor. domain_name) ;
174+ continue ;
175+ }
176+ let url = format ! (
177+ "{}{}/user/{}/role" ,
178+ ingestor. domain_name,
179+ base_path_without_preceding_slash( ) ,
180+ username
181+ ) ;
182+
183+ let res = client
184+ . put ( url)
185+ . header ( header:: AUTHORIZATION , & ingestor. token )
186+ . header ( header:: CONTENT_TYPE , "application/json" )
187+ . body ( role. clone ( ) )
188+ . send ( )
189+ . await
190+ . map_err ( |err| {
191+ log:: error!(
192+ "Fatal: failed to forward request to ingestor: {}\n Error: {:?}" ,
193+ ingestor. domain_name,
194+ err
195+ ) ;
196+ RBACError :: Network ( err)
197+ } ) ?;
198+
199+ if !res. status ( ) . is_success ( ) {
200+ log:: error!(
201+ "failed to forward request to ingestor: {}\n Response Returned: {:?}" ,
202+ ingestor. domain_name,
203+ res. text( ) . await
204+ ) ;
205+ }
206+ }
207+
208+ Ok ( ( ) )
209+ }
210+
211+ // forward the delete user request to all ingestors to keep them in sync
212+ pub async fn sync_user_deletion_with_ingestors ( username : & String ) -> Result < ( ) , RBACError > {
213+ let ingestor_infos = get_ingestor_info ( ) . await . map_err ( |err| {
214+ log:: error!( "Fatal: failed to get ingestor info: {:?}" , err) ;
215+ RBACError :: Anyhow ( err)
216+ } ) ?;
217+
218+ let client = reqwest:: Client :: new ( ) ;
219+ for ingestor in ingestor_infos. iter ( ) {
220+ if !utils:: check_liveness ( & ingestor. domain_name ) . await {
221+ log:: warn!( "Ingestor {} is not live" , ingestor. domain_name) ;
222+ continue ;
223+ }
224+ let url = format ! (
225+ "{}{}/user/{}" ,
226+ ingestor. domain_name,
227+ base_path_without_preceding_slash( ) ,
228+ username
229+ ) ;
230+
231+ let res = client
232+ . delete ( url)
233+ . header ( header:: AUTHORIZATION , & ingestor. token )
234+ . send ( )
235+ . await
236+ . map_err ( |err| {
237+ log:: error!(
238+ "Fatal: failed to forward request to ingestor: {}\n Error: {:?}" ,
239+ ingestor. domain_name,
240+ err
241+ ) ;
242+ RBACError :: Network ( err)
243+ } ) ?;
244+
245+ if !res. status ( ) . is_success ( ) {
246+ log:: error!(
247+ "failed to forward request to ingestor: {}\n Response Returned: {:?}" ,
248+ ingestor. domain_name,
249+ res. text( ) . await
250+ ) ;
251+ }
252+ }
253+
254+ Ok ( ( ) )
255+ }
256+
257+ // forward the create user request to all ingestors to keep them in sync
258+ pub async fn sync_user_creation_with_ingestors (
259+ user : User ,
260+ role : & Option < HashSet < String > > ,
261+ ) -> Result < ( ) , RBACError > {
262+ let ingestor_infos = get_ingestor_info ( ) . await . map_err ( |err| {
263+ log:: error!( "Fatal: failed to get ingestor info: {:?}" , err) ;
264+ RBACError :: Anyhow ( err)
265+ } ) ?;
266+
267+ let mut user = user. clone ( ) ;
268+
269+ if let Some ( role) = role {
270+ user. roles . clone_from ( role) ;
271+ }
272+ let username = user. username ( ) ;
273+ let client = reqwest:: Client :: new ( ) ;
274+
275+ let user = to_vec ( & user) . map_err ( |err| {
276+ log:: error!( "Fatal: failed to serialize user: {:?}" , err) ;
277+ RBACError :: SerdeError ( err)
278+ } ) ?;
279+
280+ for ingestor in ingestor_infos. iter ( ) {
281+ if !utils:: check_liveness ( & ingestor. domain_name ) . await {
282+ log:: warn!( "Ingestor {} is not live" , ingestor. domain_name) ;
283+ continue ;
284+ }
285+ let url = format ! (
286+ "{}{}/user/{}" ,
287+ ingestor. domain_name,
288+ base_path_without_preceding_slash( ) ,
289+ username
290+ ) ;
291+
292+ let res = client
293+ . post ( url)
294+ . header ( header:: AUTHORIZATION , & ingestor. token )
295+ . header ( header:: CONTENT_TYPE , "application/json" )
296+ . body ( user. clone ( ) )
297+ . send ( )
298+ . await
299+ . map_err ( |err| {
300+ log:: error!(
301+ "Fatal: failed to forward request to ingestor: {}\n Error: {:?}" ,
302+ ingestor. domain_name,
303+ err
304+ ) ;
305+ RBACError :: Network ( err)
306+ } ) ?;
307+
308+ if !res. status ( ) . is_success ( ) {
309+ log:: error!(
310+ "failed to forward request to ingestor: {}\n Response Returned: {:?}" ,
311+ ingestor. domain_name,
312+ res. text( ) . await
313+ ) ;
314+ }
315+ }
316+
317+ Ok ( ( ) )
318+ }
319+
320+ // forward the password reset request to all ingestors to keep them in sync
321+ pub async fn sync_password_reset_with_ingestors ( username : & String ) -> Result < ( ) , RBACError > {
322+ let ingestor_infos = get_ingestor_info ( ) . await . map_err ( |err| {
323+ log:: error!( "Fatal: failed to get ingestor info: {:?}" , err) ;
324+ RBACError :: Anyhow ( err)
325+ } ) ?;
326+ let client = reqwest:: Client :: new ( ) ;
327+
328+ for ingestor in ingestor_infos. iter ( ) {
329+ if !utils:: check_liveness ( & ingestor. domain_name ) . await {
330+ log:: warn!( "Ingestor {} is not live" , ingestor. domain_name) ;
331+ continue ;
332+ }
333+ let url = format ! (
334+ "{}{}/user/{}/generate-new-password" ,
335+ ingestor. domain_name,
336+ base_path_without_preceding_slash( ) ,
337+ username
338+ ) ;
339+
340+ let res = client
341+ . post ( url)
342+ . header ( header:: AUTHORIZATION , & ingestor. token )
343+ . header ( header:: CONTENT_TYPE , "application/json" )
344+ . send ( )
345+ . await
346+ . map_err ( |err| {
347+ log:: error!(
348+ "Fatal: failed to forward request to ingestor: {}\n Error: {:?}" ,
349+ ingestor. domain_name,
350+ err
351+ ) ;
352+ RBACError :: Network ( err)
353+ } ) ?;
354+
355+ if !res. status ( ) . is_success ( ) {
356+ log:: error!(
357+ "failed to forward request to ingestor: {}\n Response Returned: {:?}" ,
358+ ingestor. domain_name,
359+ res. text( ) . await
146360 ) ;
147361 }
148362 }
0 commit comments