File tree Expand file tree Collapse file tree 1 file changed +13
-3
lines changed Expand file tree Collapse file tree 1 file changed +13
-3
lines changed Original file line number Diff line number Diff line change 1919mod filter_optimizer;
2020mod table_provider;
2121
22- use chrono:: TimeZone ;
2322use chrono:: { DateTime , Utc } ;
23+ use chrono:: { TimeZone , Timelike } ;
2424use datafusion:: arrow:: datatypes:: Schema ;
2525use datafusion:: arrow:: record_batch:: RecordBatch ;
2626use datafusion:: datasource:: file_format:: parquet:: ParquetFormat ;
@@ -128,8 +128,18 @@ impl Query {
128128 ) -> Result < ( Vec < RecordBatch > , Vec < String > ) , ExecuteError > {
129129 let ctx = self . create_session_context ( ) ;
130130 let remote_listing_table = self . _remote_query ( storage) ?;
131- let memtable =
132- crate :: event:: STREAM_WRITERS . recordbatches_cloned ( & self . stream_name , & self . schema ) ;
131+
132+ let current_minute = Utc :: now ( )
133+ . with_second ( 0 )
134+ . and_then ( |x| x. with_nanosecond ( 0 ) )
135+ . expect ( "zeroed value is valid" ) ;
136+
137+ let memtable = if self . end > current_minute {
138+ crate :: event:: STREAM_WRITERS . recordbatches_cloned ( & self . stream_name , & self . schema )
139+ } else {
140+ None
141+ } ;
142+
133143 let table =
134144 QueryTableProvider :: try_new ( memtable, remote_listing_table, self . schema . clone ( ) ) ?;
135145
You can’t perform that action at this time.
0 commit comments