Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ impl ParseableServer for QueryServer {
.service(
web::scope(&prism_base_path())
.service(Server::get_prism_home())
.service(Server::get_prism_logstream()),
.service(Server::get_prism_logstream())
.service(Server::get_prism_datasets()),
)
.service(Server::get_generated());
}
Expand Down
14 changes: 13 additions & 1 deletion src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ impl ParseableServer for Server {
.service(
web::scope(&prism_base_path())
.service(Server::get_prism_home())
.service(Server::get_prism_logstream()),
.service(Server::get_prism_logstream())
.service(Server::get_prism_datasets()),
)
.service(Self::get_ingest_otel_factory())
.service(Self::get_generated());
Expand Down Expand Up @@ -180,6 +181,17 @@ impl Server {
)
}

pub fn get_prism_datasets() -> Scope {
web::scope("/datasets").route(
"",
web::post()
.to(http::prism_logstream::post_datasets)
.authorize_for_stream(Action::GetStreamInfo)
.authorize_for_stream(Action::GetStats)
.authorize_for_stream(Action::GetRetention),
)
}

pub fn get_metrics_webscope() -> Scope {
web::scope("/metrics").service(
web::resource("").route(web::get().to(metrics::get).authorize(Action::Metrics)),
Expand Down
20 changes: 17 additions & 3 deletions src/handlers/http/prism_logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,29 @@
*/

use actix_web::{
web::{self, Path},
Responder,
web::{self, Json, Path},
HttpRequest, Responder,
};

use crate::prism::logstream::{get_prism_logstream_info, PrismLogstreamError};
use crate::{
prism::logstream::{get_prism_logstream_info, PrismDatasetRequest, PrismLogstreamError},
utils::actix::extract_session_key_from_req,
};

/// This API is essentially just combining the responses of /info and /schema together
pub async fn get_info(stream_name: Path<String>) -> Result<impl Responder, PrismLogstreamError> {
let prism_logstream_info = get_prism_logstream_info(&stream_name).await?;

Ok(web::Json(prism_logstream_info))
}

/// A combination of /stats, /retention, /hottier, /info, /counts and /query
pub async fn post_datasets(
Json(prism_req): Json<PrismDatasetRequest>,
req: HttpRequest,
) -> Result<impl Responder, PrismLogstreamError> {
let session_key = extract_session_key_from_req(&req)?;
let dataset = prism_req.get_datasets(session_key).await?;

Ok(web::Json(dataset))
}
190 changes: 188 additions & 2 deletions src/prism/logstream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use actix_web::http::header::ContentType;
use arrow_schema::Schema;
use chrono::Utc;
use http::StatusCode;
use serde::Serialize;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use tracing::{debug, warn};

use crate::{
handlers::http::{
Expand All @@ -31,11 +33,18 @@ use crate::{
utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats},
},
logstream::error::StreamError,
query::update_schema_when_distributed,
query::{into_query, update_schema_when_distributed, Query, QueryError},
},
hottier::{HotTierError, HotTierManager, StreamHotTier},
parseable::{StreamNotFound, PARSEABLE},
query::{error::ExecuteError, execute, CountsRequest, CountsResponse, QUERY_SESSION},
rbac::{map::SessionKey, role::Action, Users},
stats,
storage::{retention::Retention, StreamInfo, StreamType},
utils::{
arrow::record_batches_to_json,
time::{TimeParseError, TimeRange},
},
LOCK_EXPECT,
};

Expand Down Expand Up @@ -185,6 +194,168 @@ async fn get_stream_info_helper(stream_name: &str) -> Result<StreamInfo, StreamE
Ok(stream_info)
}

/// Response structure for Prism dataset queries.
/// Contains information about a stream, its statistics, retention policy,
/// and query results.
#[derive(Serialize)]
pub struct PrismDatasetResponse {
/// Basic information about the stream
info: StreamInfo,
/// Statistics for the queried timeframe
stats: QueriedStats,
/// Retention policy details
retention: Retention,
/// Hot tier information if available
hottier: Option<StreamHotTier>,
/// Count of records in the specified time range
counts: CountsResponse,
/// Collection of distinct values for source identifiers
distinct_sources: Value,
}

/// Request parameters for retrieving Prism dataset information.
/// Defines which streams to query and the time range for the query.
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PrismDatasetRequest {
/// List of stream names to query
#[serde(default)]
streams: Vec<String>,
/// ISO 8601 formatted start time or human-readable time expression
start_time: String,
/// ISO 8601 formatted end time or human-readable time expression
end_time: String,
}

impl PrismDatasetRequest {
/// Retrieves dataset information for all specified streams.
///
/// Processes each stream in the request and compiles their information.
/// Streams that don't exist or can't be accessed are skipped.
///
/// # Returns
/// - `Ok(Vec<PrismDatasetResponse>)`: List of responses for successfully processed streams
/// - `Err(PrismLogstreamError)`: If a critical error occurs during processing
///
/// # Note
/// 1. This method won't fail if individual streams fail - it will only include
/// successfully processed streams in the result.
/// 2. On receiving an empty stream list, we return for all streams the user is able to query for
pub async fn get_datasets(
mut self,
key: SessionKey,
) -> Result<Vec<PrismDatasetResponse>, PrismLogstreamError> {
if self.streams.is_empty() {
self.streams = PARSEABLE.streams.list();
}

let mut responses = vec![];
for stream in self.streams.iter() {
if Users.authorize(key.clone(), Action::ListStream, Some(stream), None)
!= crate::rbac::Response::Authorized
{
warn!("Unauthorized access requested for stream: {stream}");
continue;
}

if PARSEABLE.check_or_load_stream(&stream).await {
debug!("Stream not found: {stream}");
continue;
}

let PrismLogstreamInfo {
info,
stats,
retention,
..
} = get_prism_logstream_info(&stream).await?;

let hottier = match HotTierManager::global() {
Some(hot_tier_manager) => {
let stats = hot_tier_manager.get_hot_tier(&stream).await?;
Some(stats)
}
_ => None,
};
let records = CountsRequest {
stream: stream.clone(),
start_time: self.start_time.clone(),
end_time: self.end_time.clone(),
num_bins: 1,
}
.get_bin_density()
.await?;
let counts = CountsResponse {
fields: vec!["start_time".into(), "end_time".into(), "count".into()],
records,
};

// Retrieve distinct values for source identifiers
// Returns None if fields aren't present or if query fails
let ips = self.get_distinct_entries(&stream, "p_src_ip").await.ok();
let user_agents = self
.get_distinct_entries(&stream, "p_user_agent")
.await
.ok();

responses.push(PrismDatasetResponse {
info,
stats,
retention,
hottier,
counts,
distinct_sources: json!({
"ips": ips,
"user_agents": user_agents
}),
})
}

Ok(responses)
}

/// Retrieves distinct values for a specific field in a stream.
///
/// # Parameters
/// - `stream_name`: Name of the stream to query
/// - `field`: Field name to get distinct values for
///
/// # Returns
/// - `Ok(Vec<String>)`: List of distinct values found for the field
/// - `Err(QueryError)`: If the query fails or field doesn't exist
async fn get_distinct_entries(
&self,
stream_name: &str,
field: &str,
) -> Result<Vec<String>, QueryError> {
let query = Query {
query: format!("SELECT DISTINCT({field}) FOR {stream_name}"),
start_time: self.start_time.clone(),
end_time: self.end_time.clone(),
send_null: false,
filter_tags: None,
fields: true,
};
let time_range = TimeRange::parse_human_time(&self.start_time, &self.end_time)?;

let session_state = QUERY_SESSION.state();
let query = into_query(&query, &session_state, time_range).await?;
let (records, _) = execute(query, stream_name).await?;
let response = record_batches_to_json(&records)?;
// Extract field values from the JSON response
let values = response
.iter()
.flat_map(|row| {
row.get(field)
.and_then(|s| s.as_str())
.map(|s| s.to_string())
})
.collect();

Ok(values)
}
}

#[derive(Debug, thiserror::Error)]
pub enum PrismLogstreamError {
#[error("Error: {0}")]
Expand All @@ -193,6 +364,16 @@ pub enum PrismLogstreamError {
StreamError(#[from] StreamError),
#[error("StreamNotFound: {0}")]
StreamNotFound(#[from] StreamNotFound),
#[error("Hottier: {0}")]
Hottier(#[from] HotTierError),
#[error("Query: {0}")]
Query(#[from] QueryError),
#[error("TimeParse: {0}")]
TimeParse(#[from] TimeParseError),
#[error("Execute: {0}")]
Execute(#[from] ExecuteError),
#[error("Auth: {0}")]
Auth(#[from] actix_web::Error),
}

impl actix_web::ResponseError for PrismLogstreamError {
Expand All @@ -201,6 +382,11 @@ impl actix_web::ResponseError for PrismLogstreamError {
PrismLogstreamError::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR,
PrismLogstreamError::StreamError(e) => e.status_code(),
PrismLogstreamError::StreamNotFound(_) => StatusCode::NOT_FOUND,
PrismLogstreamError::Hottier(_) => StatusCode::INTERNAL_SERVER_ERROR,
PrismLogstreamError::Query(_) => StatusCode::INTERNAL_SERVER_ERROR,
PrismLogstreamError::TimeParse(_) => StatusCode::NOT_FOUND,
PrismLogstreamError::Execute(_) => StatusCode::INTERNAL_SERVER_ERROR,
PrismLogstreamError::Auth(_) => StatusCode::UNAUTHORIZED,
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ pub struct QueryResponse {
impl QueryResponse {
pub fn to_http(&self) -> Result<HttpResponse, QueryError> {
info!("{}", "Returning query results");
let records: Vec<&RecordBatch> = self.records.iter().collect();
let mut json_records = record_batches_to_json(&records)?;
let mut json_records = record_batches_to_json(&self.records)?;

if self.fill_null {
for map in &mut json_records {
Expand Down
8 changes: 5 additions & 3 deletions src/utils/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,12 @@ pub fn replace_columns(
/// * Result<Vec<Map<String, Value>>>
///
/// A vector of JSON objects representing the record batches.
pub fn record_batches_to_json(records: &[&RecordBatch]) -> Result<Vec<Map<String, Value>>> {
pub fn record_batches_to_json(records: &[RecordBatch]) -> Result<Vec<Map<String, Value>>> {
let buf = vec![];
let mut writer = arrow_json::ArrayWriter::new(buf);
writer.write_batches(records)?;
for record in records {
writer.write(record)?;
}
writer.finish()?;

let buf = writer.into_inner();
Expand Down Expand Up @@ -188,7 +190,7 @@ mod tests {
#[test]
fn check_empty_json_to_record_batches() {
let r = RecordBatch::new_empty(Arc::new(Schema::empty()));
let rb = vec![&r];
let rb = vec![r];
let batches = record_batches_to_json(&rb).unwrap();
assert_eq!(batches, vec![]);
}
Expand Down
Loading