Skip to content

Commit 5d037e5

Browse files
committed
impl query result caching
1 parent 137a94e commit 5d037e5

File tree

4 files changed

+389
-5
lines changed

4 files changed

+389
-5
lines changed

server/Cargo.toml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,14 @@ build = "build.rs"
99

1010
[dependencies]
1111
### apache arrow/datafusion dependencies
12+
# arrow = "51.0.0"
1213
arrow-schema = { version = "51.0.0", features = ["serde"] }
1314
arrow-array = { version = "51.0.0" }
1415
arrow-json = "51.0.0"
1516
arrow-ipc = { version = "51.0.0", features = ["zstd"] }
1617
arrow-select = "51.0.0"
1718
datafusion = "37.1.0"
18-
object_store = { version = "0.9.1", features = ["cloud", "aws"] }
19+
object_store = { version = "0.9.1", features = ["cloud", "aws"] } # cannot update object_store as datafusion has not caught up
1920
parquet = "51.0.0"
2021
arrow-flight = { version = "51.0.0", features = [ "tls" ] }
2122
tonic = {version = "0.11.0", features = ["tls", "transport", "gzip", "zstd"] }
@@ -72,11 +73,11 @@ relative-path = { version = "1.7", features = ["serde"] }
7273
reqwest = { version = "0.11.27", default_features = false, features = [
7374
"rustls-tls",
7475
"json",
75-
] }
76-
rustls = "0.22.4"
76+
] } # cannot update cause rustls is not latest `see rustls`
77+
rustls = "0.22.4" # cannot update to 0.23 actix has not caught up yet
7778
rustls-pemfile = "2.1.2"
7879
semver = "1.0"
79-
serde = { version = "1.0", features = ["rc"] }
80+
serde = { version = "1.0", features = ["rc", "derive"] }
8081
serde_json = "1.0"
8182
static-files = "0.2"
8283
sysinfo = "0.30.11"

server/src/handlers/http/query.rs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,13 @@ use crate::event::error::EventError;
3434
use crate::handlers::http::fetch_schema;
3535

3636
use crate::event::commit_schema;
37+
use crate::localcache::CacheError;
3738
use crate::metrics::QUERY_EXECUTE_TIME;
3839
use crate::option::{Mode, CONFIG};
3940
use crate::query::error::ExecuteError;
4041
use crate::query::Query as LogicalQuery;
4142
use crate::query::{TableScanVisitor, QUERY_SESSION};
43+
use crate::querycache::QueryCacheManager;
4244
use crate::rbac::role::{Action, Permission};
4345
use crate::rbac::Users;
4446
use crate::response::QueryResponse;
@@ -50,7 +52,7 @@ use crate::utils::actix::extract_session_key_from_req;
5052
#[derive(Debug, serde::Deserialize, serde::Serialize)]
5153
#[serde(rename_all = "camelCase")]
5254
pub struct Query {
53-
pub query: String,
55+
pub pub query: String,
5456
pub start_time: String,
5557
pub end_time: String,
5658
#[serde(default)]
@@ -72,6 +74,36 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
7274
// create a visitor to extract the table name
7375
let mut visitor = TableScanVisitor::default();
7476
let _ = raw_logical_plan.visit(&mut visitor);
77+
let stream = visitor.top();
78+
let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size)
79+
.await
80+
.unwrap_or(None);
81+
82+
if let Some(query_cache_manager) = query_cache_manager {
83+
let mut query_cache = query_cache_manager.get_cache(stream).await?;
84+
85+
let (start, end) = parse_human_time(&query_request.start_time, &query_request.end_time)?;
86+
let key = format!(
87+
"{}-{}-{}",
88+
start.to_rfc3339(),
89+
end.to_rfc3339(),
90+
query_request.query.clone()
91+
);
92+
93+
let file_path = query_cache.get_file(key);
94+
if let Some(file_path) = file_path {
95+
let (records, fields) = query_cache.get_cached_records(&file_path).await?;
96+
let response = QueryResponse {
97+
records,
98+
fields,
99+
fill_null: query_request.send_null,
100+
with_fields: query_request.fields,
101+
}
102+
.to_http()?;
103+
104+
return Ok(response);
105+
}
106+
};
75107

76108
let tables = visitor.into_inner();
77109

@@ -99,6 +131,18 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
99131
let time = Instant::now();
100132

101133
let (records, fields) = query.execute(table_name.clone()).await?;
134+
// put the rbs to parquet
135+
if let Some(query_cache_manager) = query_cache_manager {
136+
query_cache_manager
137+
.create_parquet_cache(
138+
&table_name,
139+
&records,
140+
query.start.to_rfc3339(),
141+
query.end.to_rfc3339(),
142+
query_request.query,
143+
)
144+
.await?;
145+
}
102146

103147
let response = QueryResponse {
104148
records,

server/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ mod migration;
3232
mod oidc;
3333
mod option;
3434
mod query;
35+
mod querycache;
3536
mod rbac;
3637
mod response;
3738
mod static_schema;

0 commit comments

Comments
 (0)