Skip to content

Commit 80c04c1

Browse files
authored
engine: Service timeout shutdown (#1919)
* timeout shutdown * timeout shutdown, clippy & fmt * timeout shutdown proper name a default value * timeout shutdown interval && idle timeout
1 parent 76d1655 commit 80c04c1

File tree

3 files changed

+128
-20
lines changed

3 files changed

+128
-20
lines changed

crates/core-executor/src/service.rs

Lines changed: 51 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,19 @@
1+
use super::error::{self as ex_error, Result};
2+
use super::models::{AsyncQueryHandle, QueryContext, QueryResult, QueryResultStatus};
3+
use super::running_queries::{RunningQueries, RunningQueriesRegistry, RunningQuery};
4+
use super::session::UserSession;
5+
use crate::running_queries::RunningQueryId;
6+
use crate::session::{SESSION_INACTIVITY_EXPIRATION_SECONDS, to_unix};
7+
use crate::tracing::SpanTracer;
8+
use crate::utils::{Config, MemPoolType};
19
use bytes::{Buf, Bytes};
10+
use core_history::HistoryStore;
11+
use core_history::SlateDBHistoryStore;
12+
use core_history::{QueryRecordId, QueryResultError, QueryStatus};
13+
use core_metastore::{
14+
Database, Metastore, Schema, SchemaIdent, SlateDBMetastore, TableIdent as MetastoreTableIdent,
15+
Volume, VolumeType,
16+
};
217
use datafusion::arrow::array::RecordBatch;
318
use datafusion::arrow::csv::ReaderBuilder;
419
use datafusion::arrow::csv::reader::Format;
@@ -13,29 +28,13 @@ use datafusion::execution::memory_pool::{
1328
};
1429
use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
1530
use datafusion_common::TableReference;
31+
use df_catalog::catalog_list::{DEFAULT_CATALOG, EmbucketCatalogList};
1632
use snafu::ResultExt;
1733
use std::num::NonZeroUsize;
1834
use std::sync::atomic::Ordering;
1935
use std::vec;
2036
use std::{collections::HashMap, sync::Arc};
2137
use time::{Duration as DateTimeDuration, OffsetDateTime};
22-
23-
use super::error::{self as ex_error, Result};
24-
use super::models::{AsyncQueryHandle, QueryContext, QueryResult, QueryResultStatus};
25-
use super::running_queries::{RunningQueries, RunningQueriesRegistry, RunningQuery};
26-
use super::session::UserSession;
27-
use crate::running_queries::RunningQueryId;
28-
use crate::session::{SESSION_INACTIVITY_EXPIRATION_SECONDS, to_unix};
29-
use crate::tracing::SpanTracer;
30-
use crate::utils::{Config, MemPoolType};
31-
use core_history::HistoryStore;
32-
use core_history::SlateDBHistoryStore;
33-
use core_history::{QueryRecordId, QueryResultError, QueryStatus};
34-
use core_metastore::{
35-
Database, Metastore, Schema, SchemaIdent, SlateDBMetastore, TableIdent as MetastoreTableIdent,
36-
Volume, VolumeType,
37-
};
38-
use df_catalog::catalog_list::{DEFAULT_CATALOG, EmbucketCatalogList};
3938
use tokio::sync::RwLock;
4039
use tokio::sync::oneshot;
4140
use tokio::time::{Duration, timeout};
@@ -44,6 +43,8 @@ use uuid::Uuid;
4443

4544
const DEFAULT_SCHEMA: &str = "public";
4645

46+
pub const TIMEOUT_SIGNAL_INTERVAL_SECONDS: u64 = 60;
47+
4748
#[async_trait::async_trait]
4849
pub trait ExecutionService: Send + Sync {
4950
async fn create_session(&self, session_id: &str) -> Result<Arc<UserSession>>;
@@ -145,6 +146,8 @@ pub trait ExecutionService: Send + Sync {
145146
file_name: &str,
146147
format: Format,
147148
) -> Result<usize>;
149+
150+
async fn timeout_signal(&self, interval: Duration, idle_timeout: Duration) -> ();
148151
}
149152

150153
pub struct CoreExecutionService {
@@ -891,6 +894,37 @@ impl ExecutionService for CoreExecutionService {
891894

892895
Ok(rows_loaded)
893896
}
897+
898+
async fn timeout_signal(&self, interval: Duration, idle_timeout: Duration) -> () {
899+
let mut interval = tokio::time::interval(interval);
900+
interval.tick().await; // The first tick completes immediately; skip.
901+
let mut idle_since: Option<std::time::Instant> = None;
902+
loop {
903+
interval.tick().await;
904+
let sessions_empty = {
905+
let sessions = self.df_sessions.read().await;
906+
sessions.is_empty()
907+
};
908+
let queries_empty = self.queries.count() == 0;
909+
let idle_now = sessions_empty && queries_empty;
910+
match (idle_now, idle_since) {
911+
(true, None) => {
912+
// just entered idle
913+
idle_since = Some(std::time::Instant::now());
914+
}
915+
(true, Some(since)) => {
916+
if since.elapsed() >= idle_timeout {
917+
// stayed idle long enough
918+
return;
919+
}
920+
}
921+
(false, _) => {
922+
// became active again, reset the idle window
923+
idle_since = None;
924+
}
925+
}
926+
}
927+
}
894928
}
895929

896930
//Test environment

crates/embucketd/src/cli.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,14 @@ pub struct CliOpts {
284284
help = "Tracing span processor"
285285
)]
286286
pub tracing_span_processor: TracingSpanProcessor,
287+
288+
#[arg(
289+
long,
290+
env = "IDLE_TIMEOUT_SECONDS",
291+
default_value = "18000",
292+
help = "Service idle timeout in seconds"
293+
)]
294+
pub timeout: Option<u64>,
287295
}
288296

289297
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)]

crates/embucketd/src/main.rs

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ use axum::{
3434
routing::{get, post},
3535
};
3636
use clap::Parser;
37-
use core_executor::service::CoreExecutionService;
37+
use core_executor::service::{
38+
CoreExecutionService, ExecutionService, TIMEOUT_SIGNAL_INTERVAL_SECONDS,
39+
};
3840
use core_executor::utils::Config as ExecutionConfig;
3941
use core_history::SlateDBHistoryStore;
4042
use core_metastore::SlateDBMetastore;
@@ -345,8 +347,13 @@ async fn async_main(
345347
.expect("Failed to bind to address");
346348
let addr = listener.local_addr().expect("Failed to get local address");
347349
tracing::info!(%addr, "Listening on http");
350+
let timeout = opts.timeout.unwrap();
348351
axum::serve(listener, router)
349-
.with_graceful_shutdown(shutdown_signal(Arc::new(db.clone())))
352+
.with_graceful_shutdown(shutdown_signal(
353+
Arc::new(db.clone()),
354+
execution_svc.clone(),
355+
timeout,
356+
))
350357
.await
351358
.expect("Failed to start server");
352359

@@ -469,7 +476,7 @@ fn setup_tracing(opts: &cli::CliOpts) -> SdkTracerProvider {
469476
clippy::redundant_pub_crate,
470477
clippy::cognitive_complexity
471478
)]
472-
async fn shutdown_signal(db: Arc<Db>) {
479+
async fn shutdown_signal(db: Arc<Db>, execution_svc: Arc<dyn ExecutionService>, timeout: u64) {
473480
let ctrl_c = async {
474481
signal::ctrl_c()
475482
.await
@@ -487,6 +494,11 @@ async fn shutdown_signal(db: Arc<Db>) {
487494
#[cfg(not(unix))]
488495
let terminate = std::future::pending::<()>();
489496

497+
let timeout = execution_svc.timeout_signal(
498+
tokio::time::Duration::from_secs(TIMEOUT_SIGNAL_INTERVAL_SECONDS),
499+
tokio::time::Duration::from_secs(timeout),
500+
);
501+
490502
tokio::select! {
491503
() = ctrl_c => {
492504
db.close().await.expect("Failed to close database");
@@ -496,6 +508,9 @@ async fn shutdown_signal(db: Arc<Db>) {
496508
db.close().await.expect("Failed to close database");
497509
tracing::warn!("SIGTERM received, starting graceful shutdown");
498510
},
511+
() = timeout => {
512+
tracing::warn!("No sessions in use & no running queries - timeout, starting graceful shutdown");
513+
}
499514
}
500515

501516
tracing::warn!("signal received, starting graceful shutdown");
@@ -513,3 +528,54 @@ fn load_openapi_spec() -> Option<openapi::OpenApi> {
513528
original_spec.paths = openapi::Paths::new();
514529
Some(original_spec)
515530
}
531+
532+
#[cfg(test)]
533+
mod tests {
534+
use api_sessions::session::SessionStore;
535+
use core_executor::models::QueryContext;
536+
use core_executor::service::ExecutionService;
537+
use core_executor::service::make_test_execution_svc;
538+
use core_executor::session::to_unix;
539+
use std::sync::atomic::Ordering;
540+
use std::time::Duration;
541+
use time::OffsetDateTime;
542+
543+
#[tokio::test]
544+
#[allow(clippy::expect_used, clippy::too_many_lines)]
545+
async fn test_timeout_signal() {
546+
let execution_svc = make_test_execution_svc().await;
547+
548+
let df_session_id = "fasfsafsfasafsass".to_string();
549+
let user_session = execution_svc
550+
.create_session(&df_session_id)
551+
.await
552+
.expect("Failed to create a session");
553+
554+
execution_svc
555+
.query(&df_session_id, "SELECT SLEEP(5)", QueryContext::default())
556+
.await
557+
.expect("Failed to execute query (session deleted)");
558+
559+
user_session
560+
.expiry
561+
.store(to_unix(OffsetDateTime::now_utc()), Ordering::Relaxed);
562+
563+
let session_store = SessionStore::new(execution_svc.clone());
564+
565+
tokio::task::spawn({
566+
let session_store = session_store.clone();
567+
async move {
568+
session_store
569+
.continuously_delete_expired(Duration::from_secs(1))
570+
.await;
571+
}
572+
});
573+
574+
let timeout = execution_svc.timeout_signal(Duration::from_secs(1), Duration::from_secs(3));
575+
tokio::select! {
576+
() = timeout => {
577+
tracing::warn!("No sessions in use & no running queries - timeout, starting graceful shutdown");
578+
}
579+
}
580+
}
581+
}

0 commit comments

Comments
 (0)