@@ -26,6 +26,7 @@ use crate::handlers::http::logstream::error::StreamError;
2626use crate :: option:: CONFIG ;
2727
2828use crate :: metrics:: prom_utils:: Metrics ;
29+ use crate :: rbac:: user:: User ;
2930use crate :: stats:: Stats ;
3031use crate :: storage:: object_storage:: ingestor_metadata_path;
3132use crate :: storage:: PARSEABLE_ROOT_DIRECTORY ;
@@ -39,13 +40,15 @@ use itertools::Itertools;
3940use relative_path:: RelativePathBuf ;
4041use serde:: de:: Error ;
4142use serde_json:: error:: Error as SerdeError ;
42- use serde_json:: Value as JsonValue ;
43+ use serde_json:: { to_vec , Value as JsonValue } ;
4344use url:: Url ;
4445type IngestorMetadataArr = Vec < IngestorMetadata > ;
4546
4647use self :: utils:: StorageStats ;
4748
4849use super :: base_path_without_preceding_slash;
50+ use super :: rbac:: RBACError ;
51+ use std:: collections:: HashSet ;
4952use std:: time:: Duration ;
5053
5154use super :: modal:: IngestorMetadata ;
@@ -94,7 +97,7 @@ pub async fn sync_cache_with_ingestors(
9497 Ok ( ( ) )
9598}
9699
97- // forward the request to all ingestors to keep them in sync
100+ // forward the create/update stream request to all ingestors to keep them in sync
98101pub async fn sync_streams_with_ingestors (
99102 req : HttpRequest ,
100103 body : Bytes ,
@@ -137,7 +140,218 @@ pub async fn sync_streams_with_ingestors(
137140 log:: error!(
138141 "failed to forward upsert stream request to ingestor: {}\n Response Returned: {:?}" ,
139142 ingestor. domain_name,
140- res
143+ res. text( ) . await
144+ ) ;
145+ }
146+ }
147+
148+ Ok ( ( ) )
149+ }
150+
151+ // forward the role update request to all ingestors to keep them in sync
152+ pub async fn sync_users_with_roles_with_ingestors (
153+ username : & String ,
154+ role : & HashSet < String > ,
155+ ) -> Result < ( ) , RBACError > {
156+ let ingestor_infos = get_ingestor_info ( ) . await . map_err ( |err| {
157+ log:: error!( "Fatal: failed to get ingestor info: {:?}" , err) ;
158+ RBACError :: Anyhow ( err)
159+ } ) ?;
160+
161+ let client = reqwest:: Client :: new ( ) ;
162+ let role = to_vec ( & role. clone ( ) ) . map_err ( |err| {
163+ log:: error!( "Fatal: failed to serialize role: {:?}" , err) ;
164+ RBACError :: SerdeError ( err)
165+ } ) ?;
166+ for ingestor in ingestor_infos. iter ( ) {
167+ if !utils:: check_liveness ( & ingestor. domain_name ) . await {
168+ log:: warn!( "Ingestor {} is not live" , ingestor. domain_name) ;
169+ continue ;
170+ }
171+ let url = format ! (
172+ "{}{}/user/{}/role" ,
173+ ingestor. domain_name,
174+ base_path_without_preceding_slash( ) ,
175+ username
176+ ) ;
177+
178+ let res = client
179+ . put ( url)
180+ . header ( header:: AUTHORIZATION , & ingestor. token )
181+ . header ( header:: CONTENT_TYPE , "application/json" )
182+ . body ( role. clone ( ) )
183+ . send ( )
184+ . await
185+ . map_err ( |err| {
186+ log:: error!(
187+ "Fatal: failed to forward request to ingestor: {}\n Error: {:?}" ,
188+ ingestor. domain_name,
189+ err
190+ ) ;
191+ RBACError :: Network ( err)
192+ } ) ?;
193+
194+ if !res. status ( ) . is_success ( ) {
195+ log:: error!(
196+ "failed to forward request to ingestor: {}\n Response Returned: {:?}" ,
197+ ingestor. domain_name,
198+ res. text( ) . await
199+ ) ;
200+ }
201+ }
202+
203+ Ok ( ( ) )
204+ }
205+
206+ // forward the delete user request to all ingestors to keep them in sync
207+ pub async fn sync_user_deletion_with_ingestors ( username : & String ) -> Result < ( ) , RBACError > {
208+ let ingestor_infos = get_ingestor_info ( ) . await . map_err ( |err| {
209+ log:: error!( "Fatal: failed to get ingestor info: {:?}" , err) ;
210+ RBACError :: Anyhow ( err)
211+ } ) ?;
212+
213+ let client = reqwest:: Client :: new ( ) ;
214+ for ingestor in ingestor_infos. iter ( ) {
215+ if !utils:: check_liveness ( & ingestor. domain_name ) . await {
216+ log:: warn!( "Ingestor {} is not live" , ingestor. domain_name) ;
217+ continue ;
218+ }
219+ let url = format ! (
220+ "{}{}/user/{}" ,
221+ ingestor. domain_name,
222+ base_path_without_preceding_slash( ) ,
223+ username
224+ ) ;
225+
226+ let res = client
227+ . delete ( url)
228+ . header ( header:: AUTHORIZATION , & ingestor. token )
229+ . send ( )
230+ . await
231+ . map_err ( |err| {
232+ log:: error!(
233+ "Fatal: failed to forward request to ingestor: {}\n Error: {:?}" ,
234+ ingestor. domain_name,
235+ err
236+ ) ;
237+ RBACError :: Network ( err)
238+ } ) ?;
239+
240+ if !res. status ( ) . is_success ( ) {
241+ log:: error!(
242+ "failed to forward request to ingestor: {}\n Response Returned: {:?}" ,
243+ ingestor. domain_name,
244+ res. text( ) . await
245+ ) ;
246+ }
247+ }
248+
249+ Ok ( ( ) )
250+ }
251+
252+ // forward the create user request to all ingestors to keep them in sync
253+ pub async fn sync_user_creation_with_ingestors (
254+ user : User ,
255+ role : & Option < HashSet < String > > ,
256+ ) -> Result < ( ) , RBACError > {
257+ let ingestor_infos = get_ingestor_info ( ) . await . map_err ( |err| {
258+ log:: error!( "Fatal: failed to get ingestor info: {:?}" , err) ;
259+ RBACError :: Anyhow ( err)
260+ } ) ?;
261+
262+ let mut user = user. clone ( ) ;
263+
264+ if let Some ( role) = role {
265+ user. roles . clone_from ( role) ;
266+ }
267+ let username = user. username ( ) ;
268+ let client = reqwest:: Client :: new ( ) ;
269+
270+ let user = to_vec ( & user) . map_err ( |err| {
271+ log:: error!( "Fatal: failed to serialize user: {:?}" , err) ;
272+ RBACError :: SerdeError ( err)
273+ } ) ?;
274+
275+ for ingestor in ingestor_infos. iter ( ) {
276+ if !utils:: check_liveness ( & ingestor. domain_name ) . await {
277+ log:: warn!( "Ingestor {} is not live" , ingestor. domain_name) ;
278+ continue ;
279+ }
280+ let url = format ! (
281+ "{}{}/user/{}" ,
282+ ingestor. domain_name,
283+ base_path_without_preceding_slash( ) ,
284+ username
285+ ) ;
286+
287+ let res = client
288+ . post ( url)
289+ . header ( header:: AUTHORIZATION , & ingestor. token )
290+ . header ( header:: CONTENT_TYPE , "application/json" )
291+ . body ( user. clone ( ) )
292+ . send ( )
293+ . await
294+ . map_err ( |err| {
295+ log:: error!(
296+ "Fatal: failed to forward request to ingestor: {}\n Error: {:?}" ,
297+ ingestor. domain_name,
298+ err
299+ ) ;
300+ RBACError :: Network ( err)
301+ } ) ?;
302+
303+ if !res. status ( ) . is_success ( ) {
304+ log:: error!(
305+ "failed to forward request to ingestor: {}\n Response Returned: {:?}" ,
306+ ingestor. domain_name,
307+ res. text( ) . await
308+ ) ;
309+ }
310+ }
311+
312+ Ok ( ( ) )
313+ }
314+
315+ // forward the password reset request to all ingestors to keep them in sync
316+ pub async fn sync_password_reset_with_ingestors ( username : & String ) -> Result < ( ) , RBACError > {
317+ let ingestor_infos = get_ingestor_info ( ) . await . map_err ( |err| {
318+ log:: error!( "Fatal: failed to get ingestor info: {:?}" , err) ;
319+ RBACError :: Anyhow ( err)
320+ } ) ?;
321+ let client = reqwest:: Client :: new ( ) ;
322+
323+ for ingestor in ingestor_infos. iter ( ) {
324+ if !utils:: check_liveness ( & ingestor. domain_name ) . await {
325+ log:: warn!( "Ingestor {} is not live" , ingestor. domain_name) ;
326+ continue ;
327+ }
328+ let url = format ! (
329+ "{}{}/user/{}/generate-new-password" ,
330+ ingestor. domain_name,
331+ base_path_without_preceding_slash( ) ,
332+ username
333+ ) ;
334+
335+ let res = client
336+ . post ( url)
337+ . header ( header:: AUTHORIZATION , & ingestor. token )
338+ . header ( header:: CONTENT_TYPE , "application/json" )
339+ . send ( )
340+ . await
341+ . map_err ( |err| {
342+ log:: error!(
343+ "Fatal: failed to forward request to ingestor: {}\n Error: {:?}" ,
344+ ingestor. domain_name,
345+ err
346+ ) ;
347+ RBACError :: Network ( err)
348+ } ) ?;
349+
350+ if !res. status ( ) . is_success ( ) {
351+ log:: error!(
352+ "failed to forward request to ingestor: {}\n Response Returned: {:?}" ,
353+ ingestor. domain_name,
354+ res. text( ) . await
141355 ) ;
142356 }
143357 }
0 commit comments