Skip to content

Commit 3be89c2

Browse files
committed
updates
- handled counts request
1 parent 6d7e0f9 commit 3be89c2

File tree

4 files changed

+117
-37
lines changed

4 files changed

+117
-37
lines changed

src/handlers/http/cluster/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1120,7 +1120,7 @@ struct QuerierStatus {
11201120
last_used: Option<Instant>,
11211121
}
11221122

1123-
async fn get_available_querier() -> Result<QuerierMetadata, QueryError> {
1123+
pub async fn get_available_querier() -> Result<QuerierMetadata, QueryError> {
11241124
// Get all querier metadata
11251125
let querier_metadata: Vec<NodeMetadata> = get_node_info(NodeType::Querier).await?;
11261126

src/handlers/http/query.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
use crate::event::error::EventError;
2020
use crate::handlers::http::fetch_schema;
2121
use crate::option::Mode;
22+
use crate::rbac::map::SessionKey;
2223
use crate::utils::arrow::record_batches_to_json;
2324
use actix_web::http::header::ContentType;
2425
use actix_web::web::{self, Json};
@@ -43,7 +44,7 @@ use std::time::Instant;
4344
use tokio::task::JoinSet;
4445
use tracing::{error, warn};
4546

46-
use crate::event::commit_schema;
47+
use crate::event::{DEFAULT_TIMESTAMP_KEY, commit_schema};
4748
use crate::metrics::QUERY_EXECUTE_TIME;
4849
use crate::parseable::{PARSEABLE, StreamNotFound};
4950
use crate::query::error::ExecuteError;
@@ -79,7 +80,7 @@ pub struct Query {
7980
/// TODO: Improve this function and make this a part of the query API
8081
pub async fn get_records_and_fields(
8182
query_request: &Query,
82-
req: &HttpRequest,
83+
creds: &SessionKey,
8384
) -> Result<(Option<Vec<RecordBatch>>, Option<Vec<String>>), QueryError> {
8485
let session_state = QUERY_SESSION.state();
8586
let time_range =
@@ -89,8 +90,8 @@ pub async fn get_records_and_fields(
8990
create_streams_for_distributed(tables.clone()).await?;
9091

9192
let query: LogicalQuery = into_query(query_request, &session_state, time_range).await?;
92-
let creds = extract_session_key_from_req(req)?;
93-
let permissions = Users.get_permissions(&creds);
93+
94+
let permissions = Users.get_permissions(creds);
9495

9596
user_auth_for_datasets(&permissions, &tables).await?;
9697

@@ -350,7 +351,12 @@ pub async fn get_counts(
350351
// if the user has given a sql query (counts call with filters applied), then use this flow
351352
// this could include filters or group by
352353
if body.conditions.is_some() {
353-
let sql = body.get_df_sql().await?;
354+
let time_partition = PARSEABLE
355+
.get_stream(&body.stream)?
356+
.get_time_partition()
357+
.unwrap_or_else(|| DEFAULT_TIMESTAMP_KEY.into());
358+
359+
let sql = body.get_df_sql(time_partition).await?;
354360

355361
let query_request = Query {
356362
query: sql,
@@ -362,7 +368,9 @@ pub async fn get_counts(
362368
filter_tags: None,
363369
};
364370

365-
let (records, _) = get_records_and_fields(&query_request, &req).await?;
371+
let creds = extract_session_key_from_req(&req)?;
372+
373+
let (records, _) = get_records_and_fields(&query_request, &creds).await?;
366374

367375
if let Some(records) = records {
368376
let json_records = record_batches_to_json(&records)?;

src/prism/logstream/mod.rs

Lines changed: 94 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,28 +20,35 @@ use std::sync::Arc;
2020

2121
use actix_web::http::header::ContentType;
2222
use arrow_schema::Schema;
23-
use chrono::Utc;
23+
use chrono::{TimeDelta, Utc};
2424
use http::StatusCode;
25+
use itertools::Itertools;
2526
use serde::{Deserialize, Serialize};
27+
use serde_json::{Value, json};
2628
use tracing::warn;
2729

2830
use crate::{
2931
LOCK_EXPECT,
32+
alerts::alert_structs::{ConditionConfig, Conditions},
33+
event::DEFAULT_TIMESTAMP_KEY,
3034
handlers::http::{
3135
cluster::{
3236
fetch_stats_from_ingestors,
3337
utils::{IngestionStats, QueriedStats, StorageStats, merge_queried_stats},
3438
},
3539
logstream::error::StreamError,
36-
query::{QueryError, update_schema_when_distributed},
40+
query::{Query, QueryError, get_records_and_fields, update_schema_when_distributed},
3741
},
3842
hottier::{HotTierError, HotTierManager, StreamHotTier},
3943
parseable::{PARSEABLE, StreamNotFound},
40-
query::{CountsRequest, CountsResponse, error::ExecuteError},
44+
query::{CountConditions, CountsRequest, CountsResponse, error::ExecuteError},
4145
rbac::{Users, map::SessionKey, role::Action},
4246
stats,
4347
storage::{StreamInfo, StreamType, retention::Retention},
44-
utils::time::TimeParseError,
48+
utils::{
49+
arrow::record_batches_to_json,
50+
time::{TimeParseError, truncate_to_minute},
51+
},
4552
validator::error::HotTierValidationError,
4653
};
4754

@@ -216,7 +223,7 @@ pub struct PrismDatasetResponse {
216223

217224
/// Request parameters for retrieving Prism dataset information.
218225
/// Defines which streams to query
219-
#[derive(Deserialize, Default)]
226+
#[derive(Deserialize, Default, Serialize)]
220227
#[serde(rename_all = "camelCase")]
221228
pub struct PrismDatasetRequest {
222229
/// List of stream names to query
@@ -290,7 +297,7 @@ impl PrismDatasetRequest {
290297

291298
// Process stream data
292299
match get_prism_logstream_info(&stream).await {
293-
Ok(info) => Ok(Some(self.build_dataset_response(stream, info).await?)),
300+
Ok(info) => Ok(Some(self.build_dataset_response(stream, info, &key).await?)),
294301
Err(err) => Err(err),
295302
}
296303
}
@@ -310,12 +317,13 @@ impl PrismDatasetRequest {
310317
&self,
311318
stream: String,
312319
info: PrismLogstreamInfo,
320+
key: &SessionKey,
313321
) -> Result<PrismDatasetResponse, PrismLogstreamError> {
314322
// Get hot tier info
315323
let hottier = self.get_hot_tier_info(&stream).await?;
316324

317325
// Get counts
318-
let counts = self.get_counts(&stream).await?;
326+
let counts = self.get_counts(&stream, key).await?;
319327

320328
Ok(PrismDatasetResponse {
321329
stream,
@@ -344,20 +352,84 @@ impl PrismDatasetRequest {
344352
}
345353
}
346354

347-
async fn get_counts(&self, stream: &str) -> Result<CountsResponse, PrismLogstreamError> {
355+
async fn get_counts(
356+
&self,
357+
stream: &str,
358+
key: &SessionKey,
359+
) -> Result<CountsResponse, PrismLogstreamError> {
360+
let end = truncate_to_minute(Utc::now());
361+
let start = end - TimeDelta::hours(1);
362+
363+
let conditions = if PARSEABLE.get_stream(stream)?.get_time_partition().is_some() {
364+
Some(CountConditions {
365+
conditions: Some(Conditions {
366+
operator: Some(crate::alerts::LogicalOperator::And),
367+
condition_config: vec![
368+
ConditionConfig {
369+
column: DEFAULT_TIMESTAMP_KEY.into(),
370+
operator: crate::alerts::WhereConfigOperator::GreaterThanOrEqual,
371+
value: Some(start.to_rfc3339()),
372+
},
373+
ConditionConfig {
374+
column: DEFAULT_TIMESTAMP_KEY.into(),
375+
operator: crate::alerts::WhereConfigOperator::LessThan,
376+
value: Some(end.to_rfc3339()),
377+
},
378+
],
379+
}),
380+
group_by: None,
381+
})
382+
} else {
383+
None
384+
};
385+
348386
let count_request = CountsRequest {
349387
stream: stream.to_owned(),
350-
start_time: "1h".to_owned(),
351-
end_time: "now".to_owned(),
388+
start_time: start.to_rfc3339(),
389+
end_time: end.to_rfc3339(),
352390
num_bins: 10,
353-
conditions: None,
391+
conditions,
354392
};
355393

356-
let records = count_request.get_bin_density().await?;
357-
Ok(CountsResponse {
358-
fields: vec!["start_time".into(), "end_time".into(), "count".into()],
359-
records,
360-
})
394+
if count_request.conditions.is_some() {
395+
// forward request to querier
396+
let query = count_request
397+
.get_df_sql(DEFAULT_TIMESTAMP_KEY.into())
398+
.await?;
399+
400+
let query_request = Query {
401+
query,
402+
start_time: start.to_rfc3339(),
403+
end_time: end.to_rfc3339(),
404+
send_null: true,
405+
fields: true,
406+
streaming: false,
407+
filter_tags: None,
408+
};
409+
410+
let (records, _) = get_records_and_fields(&query_request, key).await?;
411+
if let Some(records) = records {
412+
let json_records = record_batches_to_json(&records)?;
413+
let records = json_records.into_iter().map(Value::Object).collect_vec();
414+
415+
let res = json!({
416+
"fields": vec!["start_time", "end_time", "count"],
417+
"records": records,
418+
});
419+
420+
Ok(serde_json::from_value(res)?)
421+
} else {
422+
Err(PrismLogstreamError::Anyhow(anyhow::Error::msg(
423+
"No data returned for counts SQL",
424+
)))
425+
}
426+
} else {
427+
let records = count_request.get_bin_density().await?;
428+
Ok(CountsResponse {
429+
fields: vec!["start_time".into(), "end_time".into(), "count".into()],
430+
records,
431+
})
432+
}
361433
}
362434
}
363435

@@ -379,6 +451,10 @@ pub enum PrismLogstreamError {
379451
Execute(#[from] ExecuteError),
380452
#[error("Auth: {0}")]
381453
Auth(#[from] actix_web::Error),
454+
#[error("SerdeError: {0}")]
455+
SerdeError(#[from] serde_json::Error),
456+
#[error("ReqwestError: {0}")]
457+
ReqwestError(#[from] reqwest::Error),
382458
}
383459

384460
impl actix_web::ResponseError for PrismLogstreamError {
@@ -391,6 +467,8 @@ impl actix_web::ResponseError for PrismLogstreamError {
391467
PrismLogstreamError::Query(_) => StatusCode::INTERNAL_SERVER_ERROR,
392468
PrismLogstreamError::TimeParse(_) => StatusCode::NOT_FOUND,
393469
PrismLogstreamError::Execute(_) => StatusCode::INTERNAL_SERVER_ERROR,
470+
PrismLogstreamError::SerdeError(_) => StatusCode::INTERNAL_SERVER_ERROR,
471+
PrismLogstreamError::ReqwestError(_) => StatusCode::INTERNAL_SERVER_ERROR,
394472
PrismLogstreamError::Auth(_) => StatusCode::UNAUTHORIZED,
395473
}
396474
}

src/query/mod.rs

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -441,37 +441,31 @@ impl CountsRequest {
441441
}
442442

443443
/// This function will get executed only if self.conditions is some
444-
pub async fn get_df_sql(&self) -> Result<String, QueryError> {
444+
pub async fn get_df_sql(&self, time_column: String) -> Result<String, QueryError> {
445445
// unwrap because we have asserted that it is some
446446
let count_conditions = self.conditions.as_ref().unwrap();
447447

448-
// get time partition column
449-
let time_partition = PARSEABLE
450-
.get_stream(&self.stream)?
451-
.get_time_partition()
452-
.unwrap_or_else(|| DEFAULT_TIMESTAMP_KEY.into());
453-
454448
let time_range = TimeRange::parse_human_time(&self.start_time, &self.end_time)?;
455449

456450
let dur = time_range.end.signed_duration_since(time_range.start);
457451

458452
let date_bin = if dur.num_minutes() <= 60 * 10 {
459453
// date_bin 1 minute
460454
format!(
461-
"CAST(DATE_BIN('1 minute', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 minute', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 minute' as end_time",
462-
self.stream, self.stream
455+
"CAST(DATE_BIN('1 minute', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 minute', \"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 minute' as end_time",
456+
self.stream
463457
)
464458
} else if dur.num_minutes() > 60 * 10 && dur.num_minutes() < 60 * 240 {
465459
// date_bin 1 hour
466460
format!(
467-
"CAST(DATE_BIN('1 hour', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 hour', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 hour' as end_time",
468-
self.stream, self.stream
461+
"CAST(DATE_BIN('1 hour', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 hour', \"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 hour' as end_time",
462+
self.stream
469463
)
470464
} else {
471465
// date_bin 1 day
472466
format!(
473-
"CAST(DATE_BIN('1 day', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 day', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 day' as end_time",
474-
self.stream, self.stream
467+
"CAST(DATE_BIN('1 day', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 day', \"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 day' as end_time",
468+
self.stream
475469
)
476470
};
477471

@@ -492,7 +486,7 @@ impl CountsRequest {
492486
}
493487

494488
/// Response for the counts API
495-
#[derive(Debug, Serialize, Clone)]
489+
#[derive(Debug, Serialize, Clone, Deserialize)]
496490
pub struct CountsResponse {
497491
/// Fields in the log stream
498492
pub fields: Vec<String>,

0 commit comments

Comments
 (0)