@@ -20,7 +20,7 @@ mod filter_optimizer;
2020mod listing_table_builder;
2121pub mod stream_schema_provider;
2222
23- use chrono:: { DateTime , Utc } ;
23+ use chrono:: { DateTime , Duration , Utc } ;
2424use chrono:: { NaiveDateTime , TimeZone } ;
2525use datafusion:: arrow:: record_batch:: RecordBatch ;
2626
@@ -32,8 +32,14 @@ use datafusion::logical_expr::{Explain, Filter, LogicalPlan, PlanType, ToStringi
3232use datafusion:: prelude:: * ;
3333use itertools:: Itertools ;
3434use once_cell:: sync:: Lazy ;
35+ use tracing:: warn;
36+ use crate :: catalog:: snapshot:: Snapshot ;
37+ use crate :: catalog:: Snapshot as _;
38+ use relative_path:: RelativePathBuf ;
3539use serde_json:: { json, Value } ;
40+ use stream_schema_provider:: collect_manifest_files;
3641use std:: collections:: HashMap ;
42+ use std:: ops:: Bound ;
3743use std:: path:: { Path , PathBuf } ;
3844use std:: sync:: Arc ;
3945use sysinfo:: System ;
@@ -42,9 +48,10 @@ use self::error::ExecuteError;
4248use self :: stream_schema_provider:: GlobalSchemaProvider ;
4349pub use self :: stream_schema_provider:: PartialTimeFilter ;
4450use crate :: event;
51+ use crate :: handlers:: http:: query:: { DateBin , DateBinRecord , QueryError } ;
4552use crate :: metadata:: STREAM_INFO ;
46- use crate :: option:: CONFIG ;
47- use crate :: storage:: { ObjectStorageProvider , StorageDir } ;
53+ use crate :: option:: { Mode , CONFIG } ;
54+ use crate :: storage:: { ObjectStorageProvider , ObjectStoreFormat , StorageDir , STREAM_ROOT_DIRECTORY } ;
4855use crate :: utils:: time:: TimeRange ;
4956
5057pub static QUERY_SESSION : Lazy < SessionContext > =
@@ -195,6 +202,265 @@ impl Query {
195202 }
196203}
197204
205+ impl DateBin {
206+ /// This function is supposed to read maninfest files for the given stream,
207+ /// get the sum of `num_rows` between the `startTime` and `endTime`,
208+ /// divide that by number of bins and return in a manner acceptable for the console
209+ pub async fn get_bin_density ( & self ) -> Result < Vec < DateBinRecord > , QueryError > {
210+ let time_partition = STREAM_INFO . get_time_partition ( & self . stream )
211+ . map_err ( |err|anyhow:: Error :: msg ( err. to_string ( ) ) ) ?
212+ . unwrap_or ( event:: DEFAULT_TIMESTAMP_KEY . to_owned ( ) ) ;
213+
214+ let glob_storage = CONFIG . storage ( ) . get_object_store ( ) ;
215+
216+ let object_store = QUERY_SESSION . state ( )
217+ . runtime_env ( )
218+ . object_store_registry
219+ . get_store ( & glob_storage. store_url ( ) )
220+ . unwrap ( ) ;
221+
222+ // get time range
223+ let time_range = TimeRange :: parse_human_time ( & self . start_time , & self . end_time ) ?;
224+
225+ if time_range. start > time_range. end {
226+ todo ! ( )
227+ }
228+
229+ // get object store
230+ let object_store_format = glob_storage
231+ . get_object_store_format ( & self . stream )
232+ . await
233+ . map_err ( |err| DataFusionError :: Plan ( err. to_string ( ) ) ) ?;
234+
235+ // all the manifests will go here
236+ let mut merged_snapshot: Snapshot = Snapshot :: default ( ) ;
237+
238+ // get a list of manifests
239+ if CONFIG . parseable . mode == Mode :: Query {
240+ let path = RelativePathBuf :: from_iter ( [ & self . stream , STREAM_ROOT_DIRECTORY ] ) ;
241+ let obs = glob_storage
242+ . get_objects (
243+ Some ( & path) ,
244+ Box :: new ( |file_name| file_name. ends_with ( "stream.json" ) ) ,
245+ )
246+ . await ;
247+ if let Ok ( obs) = obs {
248+ for ob in obs {
249+ if let Ok ( object_store_format) =
250+ serde_json:: from_slice :: < ObjectStoreFormat > ( & ob)
251+ {
252+ let snapshot = object_store_format. snapshot ;
253+ for manifest in snapshot. manifest_list {
254+ merged_snapshot. manifest_list . push ( manifest) ;
255+ }
256+ }
257+ }
258+ }
259+ } else {
260+ merged_snapshot = object_store_format. snapshot ;
261+ }
262+
263+ // get final date bins
264+ let final_date_bins = self . get_bins ( & time_range) ;
265+
266+ // Download all the manifest files
267+ let time_filter = [
268+ PartialTimeFilter :: Low (
269+ Bound :: Included ( time_range. start . naive_utc ( ) )
270+ ) ,
271+ PartialTimeFilter :: High (
272+ Bound :: Included ( time_range. end . naive_utc ( ) )
273+ )
274+ ] ;
275+
276+ let start = Utc :: now ( ) ;
277+ let all_manifest_files = collect_manifest_files (
278+ object_store,
279+ merged_snapshot. manifests ( & time_filter)
280+ . into_iter ( )
281+ . sorted_by_key ( |file| file. time_lower_bound )
282+ . map ( |item| item. manifest_path )
283+ . collect ( ) ,
284+ )
285+ . await
286+ . map_err ( |err|anyhow:: Error :: msg ( err. to_string ( ) ) ) ?;
287+ let end = Utc :: now ( ) ;
288+ warn ! ( "time taken fetch menifest files- {:?}" , end. signed_duration_since( start) ) ;
289+
290+ // we have start and end times for each bin
291+ // we also have all the manifest files for the given time range
292+ // now we iterate over start and end times for each bin
293+ // then we iterate over the manifest files which are within that time range
294+ // we sum up the num_rows
295+ let mut date_bin_records = Vec :: new ( ) ;
296+
297+ let start = Utc :: now ( ) ;
298+ for bin in final_date_bins {
299+ // warn!("bin-\n{bin:?}\n");
300+ let date_bin_timestamp = match & bin[ 0 ] {
301+ PartialTimeFilter :: Low ( bound) => {
302+ match bound {
303+ Bound :: Included ( ts) => ts. and_utc ( ) . timestamp_millis ( ) ,
304+ _ => unreachable ! ( )
305+ }
306+ } ,
307+ _ => unreachable ! ( )
308+ } ;
309+
310+ // extract start and end time to compare
311+ let bin_start = match & bin[ 0 ] {
312+ PartialTimeFilter :: Low ( bound) => {
313+ match bound {
314+ Bound :: Included ( ts) => ts,
315+ _ => unreachable ! ( )
316+ }
317+ } ,
318+ _ => unreachable ! ( )
319+ } ;
320+ let bin_end = match & bin[ 1 ] {
321+ PartialTimeFilter :: High ( bound) => {
322+ match bound {
323+ Bound :: Included ( ts) |
324+ Bound :: Excluded ( ts) => ts,
325+ _ => unreachable ! ( )
326+ }
327+ } ,
328+ _ => unreachable ! ( )
329+ } ;
330+
331+ let manifests_for_time_range = merged_snapshot. manifests ( & bin) ;
332+ // warn!("manifests_for_time_range-\n{manifests_for_time_range:?}\n");
333+
334+ let total_num_rows = all_manifest_files
335+ . iter ( )
336+ . map ( |m| {
337+ m. files
338+ . iter ( )
339+ . filter ( |f| {
340+ let mut default = false ;
341+ // warn!("Entering new file with num_rows- {}\n",f.num_rows);
342+ for c in & f. columns {
343+ default = if c. name == time_partition {
344+ match & c. stats {
345+ Some ( stats) => {
346+ match stats {
347+ crate :: catalog:: column:: TypedStatistics :: Int ( int64_type) => {
348+ let min = DateTime :: from_timestamp_millis ( int64_type. min ) . unwrap ( ) . naive_utc ( ) ;
349+ let max = DateTime :: from_timestamp_millis ( int64_type. max ) . unwrap ( ) . naive_utc ( ) ;
350+
351+ if ( bin_start <= & min) && ( bin_end >= & min) {
352+ // warn!("true for\nmin- {min:?}\nmax- {max:?}\n");
353+ true
354+ } else {
355+ // warn!("false for\nmin- {min:?}\nmax- {max:?}\n");
356+ false
357+ }
358+ } ,
359+ _ => false
360+ }
361+ } ,
362+ None => false ,
363+ }
364+ } else {
365+ default
366+ } ;
367+ if default {
368+ break
369+ }
370+ }
371+ default
372+ } )
373+ . map ( |f|f. num_rows )
374+ . sum :: < u64 > ( )
375+ } )
376+ . sum :: < u64 > ( ) ;
377+
378+ date_bin_records. push (
379+ DateBinRecord {
380+ date_bin_timestamp : DateTime :: from_timestamp_millis ( date_bin_timestamp) . unwrap ( ) . to_rfc3339 ( ) ,
381+ log_count : total_num_rows,
382+ }
383+ ) ;
384+ // warn!("\ntotal_num_rows- {total_num_rows}");
385+ }
386+ let end = Utc :: now ( ) ;
387+ warn ! ( "time taken by for loop- {:?}" , end. signed_duration_since( start) ) ;
388+
389+ let min = 1736063097889 ;
390+ let max = 1736063100024 ;
391+ warn ! ( "p_timestamp\n min- {:?}\n max- {:?}" , DateTime :: from_timestamp_millis( min) , DateTime :: from_timestamp_millis( max) ) ;
392+
393+ Ok ( date_bin_records)
394+ }
395+
396+ /// calculate the endTime for each bin based on num bins
397+ fn get_bins ( & self , time_range : & TimeRange ) -> Vec < [ PartialTimeFilter ; 2 ] > {
398+ // get total minutes elapsed between start and end time
399+ let total_minutes = time_range. end . signed_duration_since ( time_range. start )
400+ . num_minutes ( ) as u64 ;
401+
402+ // divide minutes by num bins to get minutes per bin
403+ let quotient = ( total_minutes / self . num_bins ) as i64 ;
404+ let remainder = ( total_minutes % self . num_bins ) as i64 ;
405+ let have_remainder = remainder > 0 ;
406+
407+ warn ! ( "get_bins" ) ;
408+ warn ! ( "quotient- {quotient}\n remainder- {remainder}\n " ) ;
409+
410+ // now create multiple bins [startTime, endTime)
411+ // Should we exclude the last one???
412+ let mut final_date_bins = vec ! [ ] ;
413+
414+ let mut start = time_range. start ;
415+
416+ let loop_end = if have_remainder {
417+ self . num_bins
418+ } else {
419+ self . num_bins - 1
420+ } ;
421+
422+ for _ in 0 ..loop_end {
423+ let bin_end = start + Duration :: minutes ( quotient) ;
424+ final_date_bins. push ( [
425+ PartialTimeFilter :: Low (
426+ Bound :: Included ( start. naive_utc ( ) )
427+ ) ,
428+ PartialTimeFilter :: High (
429+ Bound :: Excluded ( bin_end. naive_utc ( ) )
430+ )
431+ ] ) ;
432+
433+ start = bin_end;
434+ }
435+
436+ // construct the last bin
437+ // if we have remainder, then the last bin will be as long as the remainder
438+ // else it will be as long as the quotient
439+ if have_remainder {
440+ final_date_bins. push ( [
441+ PartialTimeFilter :: Low (
442+ Bound :: Included ( start. naive_utc ( ) )
443+ ) ,
444+ PartialTimeFilter :: High (
445+ Bound :: Included ( ( start + Duration :: minutes ( remainder) ) . naive_utc ( ) )
446+ )
447+ ] ) ;
448+ } else {
449+ final_date_bins. push ( [
450+ PartialTimeFilter :: Low (
451+ Bound :: Included ( start. naive_utc ( ) )
452+ ) ,
453+ PartialTimeFilter :: High (
454+ Bound :: Included ( ( start + Duration :: minutes ( quotient) ) . naive_utc ( ) )
455+ )
456+ ] ) ;
457+ }
458+
459+ final_date_bins
460+ }
461+
462+ }
463+
198464#[ derive( Debug , Default ) ]
199465pub ( crate ) struct TableScanVisitor {
200466 tables : Vec < String > ,
0 commit comments