@@ -24,7 +24,7 @@ use chrono::Utc;
2424use http:: StatusCode ;
2525use serde:: { Deserialize , Serialize } ;
2626use serde_json:: { json, Value } ;
27- use tracing:: { debug , error , warn} ;
27+ use tracing:: warn;
2828
2929use crate :: {
3030 handlers:: http:: {
@@ -247,79 +247,143 @@ impl PrismDatasetRequest {
247247 mut self ,
248248 key : SessionKey ,
249249 ) -> Result < Vec < PrismDatasetResponse > , PrismLogstreamError > {
250- let is_empty = self . streams . is_empty ( ) ;
251- if is_empty {
250+ if self . streams . is_empty ( ) {
252251 self . streams = PARSEABLE . streams . list ( ) ;
253252 }
254253
255- let mut responses = vec ! [ ] ;
256- for stream in self . streams . iter ( ) {
257- if Users . authorize ( key. clone ( ) , Action :: ListStream , Some ( stream) , None )
258- != crate :: rbac:: Response :: Authorized
259- {
260- // Don't warn if listed from Parseable
261- if !is_empty {
262- warn ! ( "Unauthorized access requested for stream: {stream}" ) ;
254+ // Process streams concurrently
255+ let results = futures:: future:: join_all (
256+ self . streams
257+ . iter ( )
258+ . map ( |stream| self . process_stream ( stream. clone ( ) , key. clone ( ) ) ) ,
259+ )
260+ . await ;
261+
262+ // Collect successful responses and handle errors
263+ let mut responses = Vec :: new ( ) ;
264+ for result in results {
265+ match result {
266+ Ok ( Some ( response) ) => responses. push ( response) ,
267+ Ok ( None ) => {
268+ warn ! ( "Stream not found or unauthorized access" ) ;
269+ continue ;
270+ }
271+ Err ( err) => {
272+ warn ! ( "error: {err}" ) ;
273+ continue ;
263274 }
264- continue ;
265275 }
276+ }
266277
267- if PARSEABLE . check_or_load_stream ( stream) . await {
268- debug ! ( "Stream not found: {stream}" ) ;
269- continue ;
270- }
278+ Ok ( responses)
279+ }
271280
272- let PrismLogstreamInfo {
273- info,
274- schema,
275- stats,
276- retention,
277- } = get_prism_logstream_info ( stream) . await ?;
278-
279- let hottier = match HotTierManager :: global ( ) {
280- Some ( manager) => match manager. get_hot_tier ( stream) . await {
281- Ok ( stats) => Some ( stats) ,
282- Err ( HotTierError :: HotTierValidationError (
283- HotTierValidationError :: NotFound ( _) ,
284- ) ) => None ,
285- Err ( err) => return Err ( err. into ( ) ) ,
286- } ,
287- _ => None ,
288- } ;
289- let records = CountsRequest {
290- stream : stream. clone ( ) ,
291- start_time : "1h" . to_owned ( ) ,
292- end_time : "now" . to_owned ( ) ,
293- num_bins : 10 ,
294- }
295- . get_bin_density ( )
296- . await ?;
297- let counts = CountsResponse {
298- fields : vec ! [ "start_time" . into( ) , "end_time" . into( ) , "count" . into( ) ] ,
299- records,
300- } ;
301-
302- // Retrieve distinct values for source identifiers
303- // Returns None if fields aren't present or if query fails
304- let ips = self . get_distinct_entries ( stream, "p_src_ip" ) . await . ok ( ) ;
305- let user_agents = self . get_distinct_entries ( stream, "p_user_agent" ) . await . ok ( ) ;
306-
307- responses. push ( PrismDatasetResponse {
308- stream : stream. clone ( ) ,
309- info,
310- schema,
311- stats,
312- retention,
313- hottier,
314- counts,
315- distinct_sources : json ! ( {
316- "ips" : ips,
317- "user_agents" : user_agents
318- } ) ,
319- } )
281+ async fn process_stream (
282+ & self ,
283+ stream : String ,
284+ key : SessionKey ,
285+ ) -> Result < Option < PrismDatasetResponse > , PrismLogstreamError > {
286+ // Skip unauthorized streams
287+ if !self . is_authorized ( & stream, & key) {
288+ return Ok ( None ) ;
320289 }
321290
322- Ok ( responses)
291+ // Skip streams that don't exist
292+ if !self . stream_exists ( & stream) . await {
293+ return Ok ( None ) ;
294+ }
295+
296+ // Process stream data
297+ match get_prism_logstream_info ( & stream) . await {
298+ Ok ( info) => Ok ( Some ( self . build_dataset_response ( stream, info) . await ?) ) ,
299+ Err ( err) => Err ( err) ,
300+ }
301+ }
302+
303+ fn is_authorized ( & self , stream : & str , key : & SessionKey ) -> bool {
304+ if Users . authorize ( key. clone ( ) , Action :: ListStream , Some ( stream) , None )
305+ != crate :: rbac:: Response :: Authorized
306+ {
307+ warn ! ( "Unauthorized access requested for stream: {stream}" ) ;
308+ false
309+ } else {
310+ true
311+ }
312+ }
313+
314+ async fn stream_exists ( & self , stream : & str ) -> bool {
315+ if PARSEABLE . check_or_load_stream ( stream) . await {
316+ warn ! ( "Stream not found: {stream}" ) ;
317+ false
318+ } else {
319+ true
320+ }
321+ }
322+
323+ async fn build_dataset_response (
324+ & self ,
325+ stream : String ,
326+ info : PrismLogstreamInfo ,
327+ ) -> Result < PrismDatasetResponse , PrismLogstreamError > {
328+ // Get hot tier info
329+ let hottier = self . get_hot_tier_info ( & stream) . await ?;
330+
331+ // Get counts
332+ let counts = self . get_counts ( & stream) . await ?;
333+
334+ // Get distinct entries concurrently
335+ let ( ips_result, user_agents_result) = futures:: join!(
336+ self . get_distinct_entries( & stream, "p_src_ip" ) ,
337+ self . get_distinct_entries( & stream, "p_user_agent" )
338+ ) ;
339+
340+ let ips = ips_result. ok ( ) ;
341+ let user_agents = user_agents_result. ok ( ) ;
342+
343+ Ok ( PrismDatasetResponse {
344+ stream,
345+ info : info. info ,
346+ schema : info. schema ,
347+ stats : info. stats ,
348+ retention : info. retention ,
349+ hottier,
350+ counts,
351+ distinct_sources : json ! ( {
352+ "ips" : ips,
353+ "user_agents" : user_agents
354+ } ) ,
355+ } )
356+ }
357+
358+ async fn get_hot_tier_info (
359+ & self ,
360+ stream : & str ,
361+ ) -> Result < Option < StreamHotTier > , PrismLogstreamError > {
362+ match HotTierManager :: global ( ) {
363+ Some ( manager) => match manager. get_hot_tier ( stream) . await {
364+ Ok ( stats) => Ok ( Some ( stats) ) ,
365+ Err ( HotTierError :: HotTierValidationError ( HotTierValidationError :: NotFound ( _) ) ) => {
366+ Ok ( None )
367+ }
368+ Err ( err) => Err ( err. into ( ) ) ,
369+ } ,
370+ None => Ok ( None ) ,
371+ }
372+ }
373+
374+ async fn get_counts ( & self , stream : & str ) -> Result < CountsResponse , PrismLogstreamError > {
375+ let count_request = CountsRequest {
376+ stream : stream. to_owned ( ) ,
377+ start_time : "1h" . to_owned ( ) ,
378+ end_time : "now" . to_owned ( ) ,
379+ num_bins : 10 ,
380+ } ;
381+
382+ let records = count_request. get_bin_density ( ) . await ?;
383+ Ok ( CountsResponse {
384+ fields : vec ! [ "start_time" . into( ) , "end_time" . into( ) , "count" . into( ) ] ,
385+ records,
386+ } )
323387 }
324388
325389 /// Retrieves distinct values for a specific field in a stream.
0 commit comments