@@ -35,11 +35,12 @@ use http::StatusCode;
3535use itertools:: Itertools ;
3636use serde:: { Deserialize , Serialize } ;
3737use serde_json:: { json, Value } ;
38- use std:: collections:: HashMap ;
38+ use std:: collections:: { HashMap , HashSet } ;
3939use std:: pin:: Pin ;
4040use std:: sync:: Arc ;
4141use std:: time:: Instant ;
42- use tracing:: error;
42+ use tokio:: task:: JoinSet ;
43+ use tracing:: { error, warn} ;
4344
4445use crate :: event:: commit_schema;
4546use crate :: metrics:: QUERY_EXECUTE_TIME ;
@@ -126,7 +127,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
126127 {
127128 Ok ( raw_logical_plan) => raw_logical_plan,
128129 Err ( _) => {
129- create_streams_for_querier ( ) . await ;
130+ create_streams_for_querier ( ) . await ? ;
130131 session_state
131132 . create_logical_plan ( & query_request. query )
132133 . await ?
@@ -433,17 +434,45 @@ pub async fn update_schema_when_distributed(tables: &Vec<String>) -> Result<(),
433434/// Create streams for querier if they do not exist
434435/// get list of streams from memory and storage
435436/// create streams for memory from storage if they do not exist
436- pub async fn create_streams_for_querier ( ) {
437- let querier_streams = PARSEABLE . streams . list ( ) ;
437+ pub async fn create_streams_for_querier ( ) -> Result < ( ) , QueryError > {
438438 let store = PARSEABLE . storage . get_object_store ( ) ;
439- let storage_streams = store. list_streams ( ) . await . unwrap ( ) ;
440- for stream_name in storage_streams {
441- if !querier_streams. contains ( & stream_name) {
442- let _ = PARSEABLE
439+ let querier_streams = PARSEABLE . streams . list ( ) ;
440+
441+ let querier_streams_set: HashSet < _ > = querier_streams. into_iter ( ) . collect ( ) ;
442+
443+ let storage_streams = store. list_streams ( ) . await ?;
444+
445+ let missing_streams: Vec < _ > = storage_streams
446+ . into_iter ( )
447+ . filter ( |stream_name| !querier_streams_set. contains ( stream_name) )
448+ . collect ( ) ;
449+
450+ if missing_streams. is_empty ( ) {
451+ return Ok ( ( ) ) ;
452+ }
453+
454+ let mut join_set = JoinSet :: new ( ) ;
455+ for stream_name in missing_streams {
456+ join_set. spawn ( async move {
457+ let result = PARSEABLE
443458 . create_stream_and_schema_from_storage ( & stream_name)
444459 . await ;
460+
461+ if let Err ( e) = & result {
462+ warn ! ( "Failed to create stream '{}': {}" , stream_name, e) ;
463+ }
464+
465+ ( stream_name, result)
466+ } ) ;
467+ }
468+
469+ while let Some ( result) = join_set. join_next ( ) . await {
470+ if let Err ( join_error) = result {
471+ warn ! ( "Task join error: {}" , join_error) ;
445472 }
446473 }
474+
475+ Ok ( ( ) )
447476}
448477
449478impl FromRequest for Query {
0 commit comments