Skip to content

Commit 0178f27

Browse files
committed
feat: Impl Query Server
1. send_request_ingest_server to return proper result 2. fix: bug if there is no parquet files in store 3. Query Server Now gets all the resultant data
1 parent eaa53b5 commit 0178f27

File tree

14 files changed

+702
-122
lines changed

14 files changed

+702
-122
lines changed

server/src/handlers/http.rs

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,6 @@ pub(crate) mod query;
3535
pub(crate) mod rbac;
3636
pub(crate) mod role;
3737

38-
// this needs to be removed from here. It is in modal->mod.rs
39-
// include!(concat!(env!("OUT_DIR"), "/generated.rs"));
40-
4138
pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760;
4239
pub const API_BASE_PATH: &str = "/api";
4340
pub const API_VERSION: &str = "v1";
@@ -58,23 +55,51 @@ pub(crate) fn cross_origin_config() -> Cors {
5855
}
5956
}
6057

61-
pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result<Vec<Value>> {
62-
// send the query request to the ingestor
58+
pub async fn send_schema_request(stream_name: &str) -> anyhow::Result<Vec<arrow_schema::Schema>> {
6359
let mut res = vec![];
64-
let ima = QueryServer::get_ingestor_info().await.unwrap();
60+
let ima = QueryServer::get_ingester_info().await.unwrap();
6561

6662
for im in ima {
67-
let uri = format!("{}{}/{}",im.domain_name, base_path(), "query");
63+
let uri = format!("{}api/v1/logstream/{}/schema", im.domain_name, stream_name);
64+
let reqw = reqwest::Client::new()
65+
.get(uri)
66+
.header(http::header::AUTHORIZATION, im.token.clone())
67+
.header(http::header::CONTENT_TYPE, "application/json")
68+
.send()
69+
.await?;
70+
71+
if reqw.status().is_success() {
72+
let v = serde_json::from_slice(&reqw.bytes().await?)?;
73+
res.push(v);
74+
}
75+
}
76+
77+
Ok(res)
78+
}
79+
80+
pub async fn send_request_to_ingestor(query: &Query) -> anyhow::Result<Vec<Value>> {
81+
// send the query request to the ingestor
82+
let mut res = vec![];
83+
let ima = QueryServer::get_ingester_info().await.unwrap();
84+
85+
for im in ima.iter() {
86+
let uri = format!("{}api/v1/{}", im.domain_name, "query");
87+
6888
let reqw = reqwest::Client::new()
6989
.post(uri)
7090
.json(query)
71-
.basic_auth("admin", Some("admin"))
91+
.header(http::header::AUTHORIZATION, im.token.clone())
92+
.header(http::header::CONTENT_TYPE, "application/json")
7293
.send()
7394
.await?;
7495

7596
if reqw.status().is_success() {
7697
let v: Value = serde_json::from_slice(&reqw.bytes().await?)?;
77-
res.push(v);
98+
if let Some(arr) = v.as_array() {
99+
for val in arr {
100+
res.push(val.clone())
101+
}
102+
}
78103
}
79104
}
80105

server/src/handlers/http/ingest.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use crate::handlers::{
3232
STREAM_NAME_HEADER_KEY,
3333
};
3434
use crate::metadata::STREAM_INFO;
35+
use crate::option::{Mode, CONFIG};
3536
use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError};
3637

3738
use super::logstream::error::CreateStreamError;
@@ -140,7 +141,18 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr
140141
if STREAM_INFO.stream_exists(stream_name) {
141142
return Ok(());
142143
}
143-
super::logstream::create_stream(stream_name.to_string()).await?;
144+
145+
match &CONFIG.parseable.mode {
146+
Mode::All | Mode::Query => {
147+
super::logstream::create_stream(stream_name.to_string()).await?;
148+
}
149+
Mode::Ingest => {
150+
return Err(PostError::Invalid(anyhow::anyhow!(
151+
"Stream {} not found. Has it been created?",
152+
stream_name
153+
)));
154+
}
155+
}
144156
Ok(())
145157
}
146158

server/src/handlers/http/logstream.rs

Lines changed: 55 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,16 @@ use serde_json::Value;
2525

2626
use crate::alerts::Alerts;
2727
use crate::metadata::STREAM_INFO;
28-
use crate::option::CONFIG;
28+
use crate::option::{Mode, CONFIG};
2929
use crate::storage::retention::Retention;
3030
use crate::storage::{LogStream, StorageDir};
3131
use crate::{catalog, event, stats};
3232
use crate::{metadata, validator};
3333

3434
use self::error::{CreateStreamError, StreamError};
3535

36+
use super::modal::query_server::{self, IngestionStats, QueriedStats, StorageStats};
37+
3638
pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
3739
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
3840

@@ -111,7 +113,6 @@ pub async fn get_alert(req: HttpRequest) -> Result<impl Responder, StreamError>
111113

112114
pub async fn put_stream(req: HttpRequest) -> Result<impl Responder, StreamError> {
113115
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
114-
115116
if metadata::STREAM_INFO.stream_exists(&stream_name) {
116117
// Error if the log stream already exists
117118
return Err(StreamError::Custom {
@@ -121,7 +122,11 @@ pub async fn put_stream(req: HttpRequest) -> Result<impl Responder, StreamError>
121122
status: StatusCode::BAD_REQUEST,
122123
});
123124
}
124-
create_stream(stream_name).await?;
125+
if CONFIG.parseable.mode == Mode::Query {
126+
query_server::QueryServer::sync_streams_with_ingesters(&stream_name).await?;
127+
}
128+
129+
create_stream(stream_name.clone()).await?;
125130

126131
Ok(("log stream created", StatusCode::OK))
127132
}
@@ -279,30 +284,62 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
279284
let stats = stats::get_current_stats(&stream_name, "json")
280285
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;
281286

287+
if CONFIG.parseable.mode == Mode::Query {
288+
let stats = query_server::QueryServer::fetch_stats_from_ingesters(&stream_name).await?;
289+
let stats = serde_json::to_value(stats).unwrap();
290+
return Ok((web::Json(stats), StatusCode::OK));
291+
}
292+
282293
let hash_map = STREAM_INFO.read().unwrap();
283294
let stream_meta = &hash_map
284295
.get(&stream_name)
285296
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;
286297

287298
let time = Utc::now();
299+
let qstats = match &stream_meta.first_event_at {
300+
Some(first_event_at) => {
301+
let ingestion_stats = IngestionStats::new(
302+
stats.events,
303+
format!("{} {}", stats.ingestion, "Bytes"),
304+
"json",
305+
);
306+
let storage_stats =
307+
StorageStats::new(format!("{} {}", stats.storage, "Bytes"), "parquet");
308+
309+
QueriedStats::new(
310+
&stream_name,
311+
&stream_meta.created_at,
312+
Some(first_event_at.to_owned()),
313+
time,
314+
ingestion_stats,
315+
storage_stats,
316+
)
317+
}
288318

289-
let stats = serde_json::json!({
290-
"stream": stream_name,
291-
"creation_time": &stream_meta.created_at,
292-
"first_event_at": Some(&stream_meta.first_event_at),
293-
"time": time,
294-
"ingestion": {
295-
"count": stats.events,
296-
"size": format!("{} {}", stats.ingestion, "Bytes"),
297-
"format": "json"
298-
},
299-
"storage": {
300-
"size": format!("{} {}", stats.storage, "Bytes"),
301-
"format": "parquet"
319+
// ? this case should not happen
320+
None => {
321+
let ingestion_stats = IngestionStats::new(
322+
stats.events,
323+
format!("{} {}", stats.ingestion, "Bytes"),
324+
"json",
325+
);
326+
let storage_stats =
327+
StorageStats::new(format!("{} {}", stats.storage, "Bytes"), "parquet");
328+
329+
QueriedStats::new(
330+
&stream_name,
331+
&stream_meta.created_at,
332+
Some('0'.to_string()),
333+
time,
334+
ingestion_stats,
335+
storage_stats,
336+
)
302337
}
303-
});
338+
};
339+
340+
let out_stats = serde_json::to_value(qstats).unwrap();
304341

305-
Ok((web::Json(stats), StatusCode::OK))
342+
Ok((web::Json(out_stats), StatusCode::OK))
306343
}
307344

308345
// Check if the first_event_at is empty
@@ -345,7 +382,6 @@ pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError>
345382
let created_at = stream_meta.unwrap().created_at;
346383

347384
metadata::STREAM_INFO.add_stream(stream_name.to_string(), created_at);
348-
349385
Ok(())
350386
}
351387

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 57 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use crate::analytics;
2020
use crate::banner;
2121
use crate::handlers::http::logstream;
2222
use crate::handlers::http::middleware::RouteExt;
23+
use crate::handlers::http::MAX_EVENT_PAYLOAD_SIZE;
2324
use crate::localcache::LocalCacheManager;
2425
use crate::metadata;
2526
use crate::metrics;
@@ -30,8 +31,6 @@ use crate::storage::ObjectStorageError;
3031
use crate::storage::PARSEABLE_METADATA_FILE_NAME;
3132
use crate::sync;
3233

33-
use std::net::SocketAddr;
34-
3534
use super::server::Server;
3635
use super::ssl_acceptor::get_ssl_acceptor;
3736
use super::IngesterMetadata;
@@ -63,8 +62,8 @@ impl ParseableServer for IngestServer {
6362
prometheus: PrometheusMetrics,
6463
_oidc_client: Option<crate::oidc::OpenidConfig>,
6564
) -> anyhow::Result<()> {
66-
// set the ingestor metadata
67-
self.set_ingestor_metadata().await?;
65+
// set the ingester metadata
66+
self.set_ingester_metadata().await?;
6867

6968
// get the ssl stuff
7069
let ssl = get_ssl_acceptor(
@@ -99,11 +98,9 @@ impl ParseableServer for IngestServer {
9998

10099
/// implement the init method will just invoke the initialize method
101100
async fn init(&self) -> anyhow::Result<()> {
102-
// self.validate()?;
103101
self.initialize().await
104102
}
105103

106-
#[allow(unused)]
107104
fn validate(&self) -> anyhow::Result<()> {
108105
if CONFIG.get_storage_mode_string() == "Local drive" {
109106
return Err(anyhow::Error::msg(
@@ -123,8 +120,9 @@ impl IngestServer {
123120
.service(
124121
// Base path "{url}/api/v1"
125122
web::scope(&base_path())
126-
.service(Server::get_query_factory())
127-
.service(Server::get_ingest_factory()),
123+
.service(Server::get_query_factory())
124+
.service(Server::get_ingest_factory())
125+
.service(Self::logstream_api()),
128126
)
129127
.service(Server::get_liveness_factory())
130128
.service(Server::get_readiness_factory())
@@ -152,46 +150,82 @@ impl IngestServer {
152150
)
153151
}
154152

155-
#[inline(always)]
156-
fn get_ingestor_address(&self) -> SocketAddr {
157-
// this might cause an issue down the line
158-
// best is to make the Cli Struct better, but thats a chore
159-
(CONFIG.parseable.address.clone())
160-
.parse::<SocketAddr>()
161-
.unwrap()
153+
fn logstream_api() -> Scope {
154+
web::scope("/logstream")
155+
.service(
156+
// GET "/logstream" ==> Get list of all Log Streams on the server
157+
web::resource("")
158+
.route(web::get().to(logstream::list).authorize(Action::ListStream)),
159+
)
160+
.service(
161+
web::scope("/{logstream}")
162+
.service(
163+
web::resource("")
164+
// PUT "/logstream/{logstream}" ==> Create log stream
165+
.route(
166+
web::put()
167+
.to(logstream::put_stream)
168+
.authorize_for_stream(Action::CreateStream),
169+
)
170+
// DELETE "/logstream/{logstream}" ==> Delete log stream
171+
.route(
172+
web::delete()
173+
.to(logstream::delete)
174+
.authorize_for_stream(Action::DeleteStream),
175+
)
176+
.app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)),
177+
)
178+
.service(
179+
// GET "/logstream/{logstream}/schema" ==> Get schema for given log stream
180+
web::resource("/schema").route(
181+
web::get()
182+
.to(logstream::schema)
183+
.authorize_for_stream(Action::GetSchema),
184+
),
185+
)
186+
.service(
187+
// GET "/logstream/{logstream}/stats" ==> Get stats for given log stream
188+
web::resource("/stats").route(
189+
web::get()
190+
.to(logstream::get_stats)
191+
.authorize_for_stream(Action::GetStats),
192+
),
193+
),
194+
)
162195
}
163196

164-
// create the ingestor metadata and put the .ingestor.json file in the object store
165-
async fn set_ingestor_metadata(&self) -> anyhow::Result<()> {
197+
// create the ingester metadata and put the .ingester.json file in the object store
198+
async fn set_ingester_metadata(&self) -> anyhow::Result<()> {
166199
let store = CONFIG.storage().get_object_store();
167200

168201
// remove ip adn go with the domain name
169-
let sock = self.get_ingestor_address();
202+
let sock = Server::get_server_address();
170203
let path = RelativePathBuf::from(format!(
171-
"ingestor.{}.{}.json",
204+
"ingester.{}.{}.json",
172205
sock.ip(), // this might be wrong
173206
sock.port()
174207
));
175208

176209
if store.get_object(&path).await.is_ok() {
177-
println!("Ingestor metadata already exists");
210+
println!("Ingester metadata already exists");
178211
return Ok(());
179212
};
180213

214+
let scheme = CONFIG.parseable.get_scheme();
181215
let resource = IngesterMetadata::new(
182216
sock.port().to_string(),
183217
CONFIG
184218
.parseable
185219
.domain_address
186220
.clone()
187221
.unwrap_or_else(|| {
188-
Url::parse(&format!("http://{}:{}", sock.ip(), sock.port())).unwrap()
222+
Url::parse(&format!("{}://{}:{}", scheme, sock.ip(), sock.port())).unwrap()
189223
})
190224
.to_string(),
191225
DEFAULT_VERSION.to_string(),
192226
store.get_bucket_name(),
193227
&CONFIG.parseable.username,
194-
&CONFIG.parseable.password, // is this secure?
228+
&CONFIG.parseable.password,
195229
);
196230

197231
let resource = serde_json::to_string(&resource)
@@ -205,7 +239,7 @@ impl IngestServer {
205239
}
206240

207241
// check for querier state. Is it there, or was it there in the past
208-
// this should happen before the set the ingestor metadata
242+
// this should happen before the set the ingester metadata
209243
async fn check_querier_state(&self) -> anyhow::Result<(), ObjectStorageError> {
210244
// how do we check for querier state?
211245
// based on the work flow of the system, the querier will always need to start first

0 commit comments

Comments
 (0)