Skip to content

Commit b663831

Browse files
authored
Impl Ingest Server (#689)
- add DELETE endpoint for stream on IngestServer - add QuriedStats struct - add stats endpoint for logstream on Ingest Server - add guard to create stream automatically during ingest on Ingest Server - api path /logstream on ingest server - make get_ingestor_info func pub - add logstream api endpoint to put - func dir_with_stream to use the proper stream metadata filename - update stream metadata file name - make stream_json_path give the correct file path when in different server modes - Add Guard to disallow the user to run the server in distributed setup with local storage - Other Ingest Servers were not starting if streams existed - If Streams Existed in the data store Server was considering it as stale data - On Startup of a new Ingest Server, Streams were not being synced up properly
1 parent 0b9a277 commit b663831

File tree

13 files changed

+571
-97
lines changed

13 files changed

+571
-97
lines changed

server/src/handlers/http.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,6 @@ pub(crate) mod query;
3232
pub(crate) mod rbac;
3333
pub(crate) mod role;
3434

35-
// this needs to be removed from here. It is in modal->mod.rs
36-
// include!(concat!(env!("OUT_DIR"), "/generated.rs"));
37-
3835
pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760;
3936
pub const API_BASE_PATH: &str = "/api";
4037
pub const API_VERSION: &str = "v1";

server/src/handlers/http/ingest.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use crate::handlers::{
3232
STREAM_NAME_HEADER_KEY,
3333
};
3434
use crate::metadata::STREAM_INFO;
35+
use crate::option::{Mode, CONFIG};
3536
use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError};
3637

3738
use super::logstream::error::CreateStreamError;
@@ -140,7 +141,18 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr
140141
if STREAM_INFO.stream_exists(stream_name) {
141142
return Ok(());
142143
}
143-
super::logstream::create_stream(stream_name.to_string()).await?;
144+
145+
match &CONFIG.parseable.mode {
146+
Mode::All | Mode::Query => {
147+
super::logstream::create_stream(stream_name.to_string()).await?;
148+
}
149+
Mode::Ingest => {
150+
return Err(PostError::Invalid(anyhow::anyhow!(
151+
"Stream {} not found. Has it been created?",
152+
stream_name
153+
)));
154+
}
155+
}
144156
Ok(())
145157
}
146158

server/src/handlers/http/logstream.rs

Lines changed: 55 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,16 @@ use serde_json::Value;
2525

2626
use crate::alerts::Alerts;
2727
use crate::metadata::STREAM_INFO;
28-
use crate::option::CONFIG;
28+
use crate::option::{Mode, CONFIG};
2929
use crate::storage::retention::Retention;
3030
use crate::storage::{LogStream, StorageDir};
3131
use crate::{catalog, event, stats};
3232
use crate::{metadata, validator};
3333

3434
use self::error::{CreateStreamError, StreamError};
3535

36+
use super::modal::query_server::{self, IngestionStats, QueriedStats, StorageStats};
37+
3638
pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
3739
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
3840

@@ -111,7 +113,6 @@ pub async fn get_alert(req: HttpRequest) -> Result<impl Responder, StreamError>
111113

112114
pub async fn put_stream(req: HttpRequest) -> Result<impl Responder, StreamError> {
113115
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
114-
115116
if metadata::STREAM_INFO.stream_exists(&stream_name) {
116117
// Error if the log stream already exists
117118
return Err(StreamError::Custom {
@@ -121,7 +122,11 @@ pub async fn put_stream(req: HttpRequest) -> Result<impl Responder, StreamError>
121122
status: StatusCode::BAD_REQUEST,
122123
});
123124
}
124-
create_stream(stream_name).await?;
125+
if CONFIG.parseable.mode == Mode::Query {
126+
query_server::QueryServer::sync_streams_with_ingesters(&stream_name).await?;
127+
}
128+
129+
create_stream(stream_name.clone()).await?;
125130

126131
Ok(("log stream created", StatusCode::OK))
127132
}
@@ -279,30 +284,62 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
279284
let stats = stats::get_current_stats(&stream_name, "json")
280285
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;
281286

287+
if CONFIG.parseable.mode == Mode::Query {
288+
let stats = query_server::QueryServer::fetch_stats_from_ingesters(&stream_name).await?;
289+
let stats = serde_json::to_value(stats).unwrap();
290+
return Ok((web::Json(stats), StatusCode::OK));
291+
}
292+
282293
let hash_map = STREAM_INFO.read().unwrap();
283294
let stream_meta = &hash_map
284295
.get(&stream_name)
285296
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;
286297

287298
let time = Utc::now();
299+
let qstats = match &stream_meta.first_event_at {
300+
Some(first_event_at) => {
301+
let ingestion_stats = IngestionStats::new(
302+
stats.events,
303+
format!("{} {}", stats.ingestion, "Bytes"),
304+
"json",
305+
);
306+
let storage_stats =
307+
StorageStats::new(format!("{} {}", stats.storage, "Bytes"), "parquet");
308+
309+
QueriedStats::new(
310+
&stream_name,
311+
&stream_meta.created_at,
312+
Some(first_event_at.to_owned()),
313+
time,
314+
ingestion_stats,
315+
storage_stats,
316+
)
317+
}
288318

289-
let stats = serde_json::json!({
290-
"stream": stream_name,
291-
"creation_time": &stream_meta.created_at,
292-
"first_event_at": Some(&stream_meta.first_event_at),
293-
"time": time,
294-
"ingestion": {
295-
"count": stats.events,
296-
"size": format!("{} {}", stats.ingestion, "Bytes"),
297-
"format": "json"
298-
},
299-
"storage": {
300-
"size": format!("{} {}", stats.storage, "Bytes"),
301-
"format": "parquet"
319+
// ? this case should not happen
320+
None => {
321+
let ingestion_stats = IngestionStats::new(
322+
stats.events,
323+
format!("{} {}", stats.ingestion, "Bytes"),
324+
"json",
325+
);
326+
let storage_stats =
327+
StorageStats::new(format!("{} {}", stats.storage, "Bytes"), "parquet");
328+
329+
QueriedStats::new(
330+
&stream_name,
331+
&stream_meta.created_at,
332+
Some('0'.to_string()),
333+
time,
334+
ingestion_stats,
335+
storage_stats,
336+
)
302337
}
303-
});
338+
};
339+
340+
let out_stats = serde_json::to_value(qstats).unwrap();
304341

305-
Ok((web::Json(stats), StatusCode::OK))
342+
Ok((web::Json(out_stats), StatusCode::OK))
306343
}
307344

308345
// Check if the first_event_at is empty
@@ -345,7 +382,6 @@ pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError>
345382
let created_at = stream_meta.unwrap().created_at;
346383

347384
metadata::STREAM_INFO.add_stream(stream_name.to_string(), created_at);
348-
349385
Ok(())
350386
}
351387

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 50 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use crate::analytics;
2020
use crate::banner;
2121
use crate::handlers::http::logstream;
2222
use crate::handlers::http::middleware::RouteExt;
23+
use crate::handlers::http::MAX_EVENT_PAYLOAD_SIZE;
2324
use crate::localcache::LocalCacheManager;
2425
use crate::metadata;
2526
use crate::metrics;
@@ -30,8 +31,6 @@ use crate::storage::ObjectStorageError;
3031
use crate::storage::PARSEABLE_METADATA_FILE_NAME;
3132
use crate::sync;
3233

33-
use std::net::SocketAddr;
34-
3534
use super::server::Server;
3635
use super::ssl_acceptor::get_ssl_acceptor;
3736
use super::IngesterMetadata;
@@ -63,8 +62,8 @@ impl ParseableServer for IngestServer {
6362
prometheus: PrometheusMetrics,
6463
_oidc_client: Option<crate::oidc::OpenidConfig>,
6564
) -> anyhow::Result<()> {
66-
// set the ingestor metadata
67-
self.set_ingestor_metadata().await?;
65+
// set the ingester metadata
66+
self.set_ingester_metadata().await?;
6867

6968
// get the ssl stuff
7069
let ssl = get_ssl_acceptor(
@@ -99,11 +98,9 @@ impl ParseableServer for IngestServer {
9998

10099
/// implement the init method will just invoke the initialize method
101100
async fn init(&self) -> anyhow::Result<()> {
102-
// self.validate()?;
103101
self.initialize().await
104102
}
105103

106-
#[allow(unused)]
107104
fn validate(&self) -> anyhow::Result<()> {
108105
if CONFIG.get_storage_mode_string() == "Local drive" {
109106
return Err(anyhow::Error::msg(
@@ -122,7 +119,10 @@ impl IngestServer {
122119
config
123120
.service(
124121
// Base path "{url}/api/v1"
125-
web::scope(&base_path()).service(Server::get_ingest_factory()),
122+
web::scope(&base_path())
123+
.service(Server::get_query_factory())
124+
.service(Server::get_ingest_factory())
125+
.service(Self::logstream_api()),
126126
)
127127
.service(Server::get_liveness_factory())
128128
.service(Server::get_readiness_factory())
@@ -150,46 +150,74 @@ impl IngestServer {
150150
)
151151
}
152152

153-
#[inline(always)]
154-
fn get_ingestor_address(&self) -> SocketAddr {
155-
// this might cause an issue down the line
156-
// best is to make the Cli Struct better, but thats a chore
157-
(CONFIG.parseable.address.clone())
158-
.parse::<SocketAddr>()
159-
.unwrap()
153+
fn logstream_api() -> Scope {
154+
web::scope("/logstream")
155+
.service(
156+
// GET "/logstream" ==> Get list of all Log Streams on the server
157+
web::resource("")
158+
.route(web::get().to(logstream::list).authorize(Action::ListStream)),
159+
)
160+
.service(
161+
web::scope("/{logstream}")
162+
.service(
163+
web::resource("")
164+
// PUT "/logstream/{logstream}" ==> Create log stream
165+
.route(
166+
web::put()
167+
.to(logstream::put_stream)
168+
.authorize_for_stream(Action::CreateStream),
169+
)
170+
// DELETE "/logstream/{logstream}" ==> Delete log stream
171+
.route(
172+
web::delete()
173+
.to(logstream::delete)
174+
.authorize_for_stream(Action::DeleteStream),
175+
)
176+
.app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)),
177+
)
178+
.service(
179+
// GET "/logstream/{logstream}/stats" ==> Get stats for given log stream
180+
web::resource("/stats").route(
181+
web::get()
182+
.to(logstream::get_stats)
183+
.authorize_for_stream(Action::GetStats),
184+
),
185+
),
186+
)
160187
}
161188

162-
// create the ingestor metadata and put the .ingestor.json file in the object store
163-
async fn set_ingestor_metadata(&self) -> anyhow::Result<()> {
189+
// create the ingester metadata and put the .ingester.json file in the object store
190+
async fn set_ingester_metadata(&self) -> anyhow::Result<()> {
164191
let store = CONFIG.storage().get_object_store();
165192

166193
// remove ip adn go with the domain name
167-
let sock = self.get_ingestor_address();
194+
let sock = Server::get_server_address();
168195
let path = RelativePathBuf::from(format!(
169-
"ingestor.{}.{}.json",
196+
"ingester.{}.{}.json",
170197
sock.ip(), // this might be wrong
171198
sock.port()
172199
));
173200

174201
if store.get_object(&path).await.is_ok() {
175-
println!("Ingestor metadata already exists");
202+
println!("Ingester metadata already exists");
176203
return Ok(());
177204
};
178205

206+
let scheme = CONFIG.parseable.get_scheme();
179207
let resource = IngesterMetadata::new(
180208
sock.port().to_string(),
181209
CONFIG
182210
.parseable
183211
.domain_address
184212
.clone()
185213
.unwrap_or_else(|| {
186-
Url::parse(&format!("http://{}:{}", sock.ip(), sock.port())).unwrap()
214+
Url::parse(&format!("{}://{}:{}", scheme, sock.ip(), sock.port())).unwrap()
187215
})
188216
.to_string(),
189217
DEFAULT_VERSION.to_string(),
190218
store.get_bucket_name(),
191219
&CONFIG.parseable.username,
192-
&CONFIG.parseable.password, // is this secure?
220+
&CONFIG.parseable.password,
193221
);
194222

195223
let resource = serde_json::to_string(&resource)
@@ -203,7 +231,7 @@ impl IngestServer {
203231
}
204232

205233
// check for querier state. Is it there, or was it there in the past
206-
// this should happen before the set the ingestor metadata
234+
// this should happen before the set the ingester metadata
207235
async fn check_querier_state(&self) -> anyhow::Result<(), ObjectStorageError> {
208236
// how do we check for querier state?
209237
// based on the work flow of the system, the querier will always need to start first

0 commit comments

Comments
 (0)