Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ impl ParseableServer for QueryServer {
config
.service(
web::scope(&base_path())
.service(Server::get_date_bin())
.service(Server::get_correlation_webscope())
.service(Server::get_query_factory())
.service(Server::get_liveness_factory())
Expand All @@ -65,6 +64,7 @@ impl ParseableServer for QueryServer {
.service(Server::get_llm_webscope())
.service(Server::get_oauth_webscope(oidc_client))
.service(Self::get_user_role_webscope())
.service(Server::get_counts_webscope())
.service(Server::get_metrics_webscope())
.service(Self::get_cluster_web_scope()),
)
Expand Down
7 changes: 3 additions & 4 deletions src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl ParseableServer for Server {
.service(Self::get_llm_webscope())
.service(Self::get_oauth_webscope(oidc_client))
.service(Self::get_user_role_webscope())
.service(Self::get_date_bin())
.service(Self::get_counts_webscope())
.service(Self::get_metrics_webscope()),
)
.service(Self::get_ingest_otel_factory())
Expand Down Expand Up @@ -267,9 +267,8 @@ impl Server {
),
)
}
pub fn get_date_bin() -> Resource {
web::resource("/datebin")
.route(web::post().to(query::get_date_bin).authorize(Action::Query))
pub fn get_counts_webscope() -> Resource {
web::resource("/counts").route(web::post().to(query::get_counts).authorize(Action::Query))
}

// get the query factory
Expand Down
29 changes: 16 additions & 13 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::event::commit_schema;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::{Mode, CONFIG};
use crate::query::error::ExecuteError;
use crate::query::{DateBinRequest, DateBinResponse, Query as LogicalQuery};
use crate::query::{CountsRequest, CountsResponse, Query as LogicalQuery};
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::rbac::Users;
use crate::response::QueryResponse;
Expand Down Expand Up @@ -104,21 +104,24 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
user_auth_for_query(&permissions, &tables)?;

let time = Instant::now();
// Intercept `count(*)`` queries and use the counts API
if let Some(column_name) = query.is_logical_plan_count_without_filters() {
let date_bin_request = DateBinRequest {
let counts_req = CountsRequest {
stream: table_name.clone(),
start_time: query_request.start_time.clone(),
end_time: query_request.end_time.clone(),
num_bins: 1,
};
let date_bin_records = date_bin_request.get_bin_density().await?;
let count_records = counts_req.get_bin_density().await?;
// NOTE: this should not panic, since there is atleast one bin, always
let log_count = count_records[0].log_count;
let response = if query_request.fields {
json!({
"fields": vec![&column_name],
"records": vec![json!({column_name: date_bin_records[0].log_count})]
"fields": [&column_name],
"records": [json!({column_name: log_count})]
})
} else {
Value::Array(vec![json!({column_name: date_bin_records[0].log_count})])
Value::Array(vec![json!({column_name: log_count})])
};

let time = time.elapsed().as_secs_f64();
Expand Down Expand Up @@ -148,21 +151,21 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
Ok(response)
}

pub async fn get_date_bin(
pub async fn get_counts(
req: HttpRequest,
date_bin: Json<DateBinRequest>,
counts_request: Json<CountsRequest>,
) -> Result<impl Responder, QueryError> {
let creds = extract_session_key_from_req(&req)?;
let permissions = Users.get_permissions(&creds);

// does user have access to table?
user_auth_for_query(&permissions, &[date_bin.stream.clone()])?;
user_auth_for_query(&permissions, &[counts_request.stream.clone()])?;

let date_bin_records = date_bin.get_bin_density().await?;
let records = counts_request.get_bin_density().await?;

Ok(web::Json(DateBinResponse {
fields: vec!["date_bin_timestamp".into(), "log_count".into()],
records: date_bin_records,
Ok(web::Json(CountsResponse {
fields: vec!["counts_timestamp".into(), "log_count".into()],
records,
}))
}

Expand Down
64 changes: 36 additions & 28 deletions src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,33 +244,39 @@ impl Query {
}
}

/// DateBinRecord
/// Record of counts for a given time bin.
#[derive(Debug, Serialize, Clone)]
pub struct DateBinRecord {
pub date_bin_timestamp: String,
pub struct CountsRecord {
/// Start time of the bin
pub counts_timestamp: String,
/// Number of logs in the bin
pub log_count: u64,
}

struct DateBinBounds {
struct TimeBounds {
start: DateTime<Utc>,
end: DateTime<Utc>,
}

/// DateBin Request.
/// Request for counts, received from API/SQL query.
#[derive(Debug, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct DateBinRequest {
pub struct CountsRequest {
/// Name of the stream to get counts for
pub stream: String,
/// Included start time for counts query
pub start_time: String,
/// Excluded end time for counts query
pub end_time: String,
/// Number of bins to divide the time range into
pub num_bins: u64,
}

impl DateBinRequest {
impl CountsRequest {
/// This function is supposed to read maninfest files for the given stream,
/// get the sum of `num_rows` between the `startTime` and `endTime`,
/// divide that by number of bins and return in a manner acceptable for the console
pub async fn get_bin_density(&self) -> Result<Vec<DateBinRecord>, QueryError> {
pub async fn get_bin_density(&self) -> Result<Vec<CountsRecord>, QueryError> {
let time_partition = STREAM_INFO
.get_time_partition(&self.stream.clone())
.map_err(|err| anyhow::Error::msg(err.to_string()))?
Expand All @@ -279,22 +285,22 @@ impl DateBinRequest {
// get time range
let time_range = TimeRange::parse_human_time(&self.start_time, &self.end_time)?;
let all_manifest_files = get_manifest_list(&self.stream, &time_range).await?;
// get final date bins
let final_date_bins = self.get_bins(&time_range);
// get bounds
let counts = self.get_bounds(&time_range);

// we have start and end times for each bin
// we also have all the manifest files for the given time range
// now we iterate over start and end times for each bin
// then we iterate over the manifest files which are within that time range
// we sum up the num_rows
let mut date_bin_records = Vec::new();
let mut counts_records = Vec::new();

for bin in final_date_bins {
for bin in counts {
// extract start and end time to compare
let date_bin_timestamp = bin.start.timestamp_millis();
let counts_timestamp = bin.start.timestamp_millis();

// Sum up the number of rows that fall within the bin
let total_num_rows: u64 = all_manifest_files
let log_count: u64 = all_manifest_files
.iter()
.flat_map(|m| &m.files)
.filter_map(|f| {
Expand All @@ -315,18 +321,18 @@ impl DateBinRequest {
})
.sum();

date_bin_records.push(DateBinRecord {
date_bin_timestamp: DateTime::from_timestamp_millis(date_bin_timestamp)
counts_records.push(CountsRecord {
counts_timestamp: DateTime::from_timestamp_millis(counts_timestamp)
.unwrap()
.to_rfc3339(),
log_count: total_num_rows,
log_count,
});
}
Ok(date_bin_records)
Ok(counts_records)
}

/// Calculate the end time for each bin based on the number of bins
fn get_bins(&self, time_range: &TimeRange) -> Vec<DateBinBounds> {
fn get_bounds(&self, time_range: &TimeRange) -> Vec<TimeBounds> {
let total_minutes = time_range
.end
.signed_duration_since(time_range.start)
Expand All @@ -337,9 +343,9 @@ impl DateBinRequest {
let remainder = total_minutes % self.num_bins;
let have_remainder = remainder > 0;

// now create multiple bins [startTime, endTime)
// now create multiple bounds [startTime, endTime)
// Should we exclude the last one???
let mut final_date_bins = vec![];
let mut bounds = vec![];

let mut start = time_range.start;

Expand All @@ -352,32 +358,34 @@ impl DateBinRequest {
// Create bins for all but the last date
for _ in 0..loop_end {
let end = start + Duration::minutes(quotient as i64);
final_date_bins.push(DateBinBounds { start, end });
bounds.push(TimeBounds { start, end });
start = end;
}

// Add the last bin, accounting for any remainder, should we include it?
if have_remainder {
final_date_bins.push(DateBinBounds {
bounds.push(TimeBounds {
start,
end: start + Duration::minutes(remainder as i64),
});
} else {
final_date_bins.push(DateBinBounds {
bounds.push(TimeBounds {
start,
end: start + Duration::minutes(quotient as i64),
});
}

final_date_bins
bounds
}
}

/// DateBin Response.
/// Response for the counts API
#[derive(Debug, Serialize, Clone)]
pub struct DateBinResponse {
pub struct CountsResponse {
/// Fields in the log stream
pub fields: Vec<String>,
pub records: Vec<DateBinRecord>,
/// Records in the response
pub records: Vec<CountsRecord>,
}

#[derive(Debug, Default)]
Expand Down
Loading