@@ -20,35 +20,28 @@ use std::sync::Arc;
2020
2121use actix_web:: http:: header:: ContentType ;
2222use arrow_schema:: Schema ;
23- use chrono:: { TimeDelta , Utc } ;
23+ use chrono:: Utc ;
2424use http:: StatusCode ;
25- use itertools:: Itertools ;
2625use serde:: { Deserialize , Serialize } ;
27- use serde_json:: { Value , json} ;
2826use tracing:: warn;
2927
3028use crate :: {
3129 LOCK_EXPECT ,
32- alerts:: alert_structs:: { ConditionConfig , Conditions } ,
33- event:: DEFAULT_TIMESTAMP_KEY ,
3430 handlers:: http:: {
3531 cluster:: {
3632 fetch_stats_from_ingestors,
3733 utils:: { IngestionStats , QueriedStats , StorageStats , merge_queried_stats} ,
3834 } ,
3935 logstream:: error:: StreamError ,
40- query:: { Query , QueryError , get_records_and_fields , update_schema_when_distributed} ,
36+ query:: { QueryError , update_schema_when_distributed} ,
4137 } ,
4238 hottier:: { HotTierError , HotTierManager , StreamHotTier } ,
4339 parseable:: { PARSEABLE , StreamNotFound } ,
44- query:: { CountConditions , CountsRequest , CountsResponse , error:: ExecuteError } ,
40+ query:: { CountsRequest , CountsResponse , error:: ExecuteError } ,
4541 rbac:: { Users , map:: SessionKey , role:: Action } ,
4642 stats,
4743 storage:: { StreamInfo , StreamType , retention:: Retention } ,
48- utils:: {
49- arrow:: record_batches_to_json,
50- time:: { TimeParseError , truncate_to_minute} ,
51- } ,
44+ utils:: time:: TimeParseError ,
5245 validator:: error:: HotTierValidationError ,
5346} ;
5447
@@ -299,7 +292,7 @@ impl PrismDatasetRequest {
299292
300293 // Process stream data
301294 match get_prism_logstream_info ( & stream) . await {
302- Ok ( info) => Ok ( Some ( self . build_dataset_response ( stream, info, & key ) . await ?) ) ,
295+ Ok ( info) => Ok ( Some ( self . build_dataset_response ( stream, info) . await ?) ) ,
303296 Err ( err) => Err ( err) ,
304297 }
305298 }
@@ -319,13 +312,12 @@ impl PrismDatasetRequest {
319312 & self ,
320313 stream : String ,
321314 info : PrismLogstreamInfo ,
322- key : & SessionKey ,
323315 ) -> Result < PrismDatasetResponse , PrismLogstreamError > {
324316 // Get hot tier info
325317 let hottier = self . get_hot_tier_info ( & stream) . await ?;
326318
327319 // Get counts
328- let counts = self . get_counts ( & stream, key ) . await ?;
320+ let counts = self . get_counts ( & stream) . await ?;
329321
330322 Ok ( PrismDatasetResponse {
331323 stream,
@@ -354,84 +346,20 @@ impl PrismDatasetRequest {
354346 }
355347 }
356348
357- async fn get_counts (
358- & self ,
359- stream : & str ,
360- key : & SessionKey ,
361- ) -> Result < CountsResponse , PrismLogstreamError > {
362- let end = truncate_to_minute ( Utc :: now ( ) ) ;
363- let start = end - TimeDelta :: hours ( 1 ) ;
364-
365- let conditions = if PARSEABLE . get_stream ( stream) ?. get_time_partition ( ) . is_some ( ) {
366- Some ( CountConditions {
367- conditions : Some ( Conditions {
368- operator : Some ( crate :: alerts:: LogicalOperator :: And ) ,
369- condition_config : vec ! [
370- ConditionConfig {
371- column: DEFAULT_TIMESTAMP_KEY . into( ) ,
372- operator: crate :: alerts:: WhereConfigOperator :: GreaterThanOrEqual ,
373- value: Some ( start. to_rfc3339( ) ) ,
374- } ,
375- ConditionConfig {
376- column: DEFAULT_TIMESTAMP_KEY . into( ) ,
377- operator: crate :: alerts:: WhereConfigOperator :: LessThan ,
378- value: Some ( end. to_rfc3339( ) ) ,
379- } ,
380- ] ,
381- } ) ,
382- group_by : None ,
383- } )
384- } else {
385- None
386- } ;
387-
349+ async fn get_counts ( & self , stream : & str ) -> Result < CountsResponse , PrismLogstreamError > {
388350 let count_request = CountsRequest {
389351 stream : stream. to_owned ( ) ,
390- start_time : start . to_rfc3339 ( ) ,
391- end_time : end . to_rfc3339 ( ) ,
352+ start_time : "1h" . to_owned ( ) ,
353+ end_time : "now" . to_owned ( ) ,
392354 num_bins : 10 ,
393- conditions,
355+ conditions : None ,
394356 } ;
395357
396- if count_request. conditions . is_some ( ) {
397- // forward request to querier
398- let query = count_request
399- . get_df_sql ( DEFAULT_TIMESTAMP_KEY . into ( ) )
400- . await ?;
401-
402- let query_request = Query {
403- query,
404- start_time : start. to_rfc3339 ( ) ,
405- end_time : end. to_rfc3339 ( ) ,
406- send_null : true ,
407- fields : true ,
408- streaming : false ,
409- filter_tags : None ,
410- } ;
411-
412- let ( records, _) = get_records_and_fields ( & query_request, key) . await ?;
413- if let Some ( records) = records {
414- let json_records = record_batches_to_json ( & records) ?;
415- let records = json_records. into_iter ( ) . map ( Value :: Object ) . collect_vec ( ) ;
416-
417- let res = json ! ( {
418- "fields" : vec![ "start_time" , "end_time" , "count" ] ,
419- "records" : records,
420- } ) ;
421-
422- Ok ( serde_json:: from_value ( res) ?)
423- } else {
424- Err ( PrismLogstreamError :: Anyhow ( anyhow:: Error :: msg (
425- "No data returned for counts SQL" ,
426- ) ) )
427- }
428- } else {
429- let records = count_request. get_bin_density ( ) . await ?;
430- Ok ( CountsResponse {
431- fields : vec ! [ "start_time" . into( ) , "end_time" . into( ) , "count" . into( ) ] ,
432- records,
433- } )
434- }
358+ let records = count_request. get_bin_density ( ) . await ?;
359+ Ok ( CountsResponse {
360+ fields : vec ! [ "start_time" . into( ) , "end_time" . into( ) , "count" . into( ) ] ,
361+ records,
362+ } )
435363 }
436364}
437365
0 commit comments