Skip to content

Commit bfdc0f7

Browse files
committed
Impl Ingest Server
add DELETE endpoint for stream on IngestServer add QuriedStats struct add stats endpoint for logstream on Ingest Server add guard to create stream automatically during ingest on Ingest Server fix: api path /logstream on ingest server make get_ingestor_info func pub add logstream api endpoint to put fix: func dir_with_stream to use the proper stream metadata filename fix: update stream metadata file name make stream_json_path give the correct file path when in different server modes
1 parent 0b9a277 commit bfdc0f7

File tree

10 files changed

+566
-68
lines changed

10 files changed

+566
-68
lines changed

server/src/handlers/http.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,6 @@ pub(crate) mod query;
3232
pub(crate) mod rbac;
3333
pub(crate) mod role;
3434

35-
// this needs to be removed from here. It is in modal->mod.rs
36-
// include!(concat!(env!("OUT_DIR"), "/generated.rs"));
37-
3835
pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760;
3936
pub const API_BASE_PATH: &str = "/api";
4037
pub const API_VERSION: &str = "v1";

server/src/handlers/http/ingest.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use crate::handlers::{
3232
STREAM_NAME_HEADER_KEY,
3333
};
3434
use crate::metadata::STREAM_INFO;
35+
use crate::option::{Mode, CONFIG};
3536
use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError};
3637

3738
use super::logstream::error::CreateStreamError;
@@ -140,7 +141,18 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr
140141
if STREAM_INFO.stream_exists(stream_name) {
141142
return Ok(());
142143
}
143-
super::logstream::create_stream(stream_name.to_string()).await?;
144+
145+
match &CONFIG.parseable.mode {
146+
Mode::All | Mode::Query => {
147+
super::logstream::create_stream(stream_name.to_string()).await?;
148+
}
149+
Mode::Ingest => {
150+
return Err(PostError::Invalid(anyhow::anyhow!(
151+
"Stream {} not found. Has it been created?",
152+
stream_name
153+
)));
154+
}
155+
}
144156
Ok(())
145157
}
146158

server/src/handlers/http/logstream.rs

Lines changed: 55 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,16 @@ use serde_json::Value;
2525

2626
use crate::alerts::Alerts;
2727
use crate::metadata::STREAM_INFO;
28-
use crate::option::CONFIG;
28+
use crate::option::{Mode, CONFIG};
2929
use crate::storage::retention::Retention;
3030
use crate::storage::{LogStream, StorageDir};
3131
use crate::{catalog, event, stats};
3232
use crate::{metadata, validator};
3333

3434
use self::error::{CreateStreamError, StreamError};
3535

36+
use super::modal::query_server::{self, IngestionStats, QueriedStats, StorageStats};
37+
3638
pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
3739
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
3840

@@ -111,7 +113,6 @@ pub async fn get_alert(req: HttpRequest) -> Result<impl Responder, StreamError>
111113

112114
pub async fn put_stream(req: HttpRequest) -> Result<impl Responder, StreamError> {
113115
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
114-
115116
if metadata::STREAM_INFO.stream_exists(&stream_name) {
116117
// Error if the log stream already exists
117118
return Err(StreamError::Custom {
@@ -121,7 +122,11 @@ pub async fn put_stream(req: HttpRequest) -> Result<impl Responder, StreamError>
121122
status: StatusCode::BAD_REQUEST,
122123
});
123124
}
124-
create_stream(stream_name).await?;
125+
if CONFIG.parseable.mode == Mode::Query {
126+
query_server::QueryServer::sync_streams_with_ingesters(&stream_name).await?;
127+
}
128+
129+
create_stream(stream_name.clone()).await?;
125130

126131
Ok(("log stream created", StatusCode::OK))
127132
}
@@ -279,30 +284,62 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
279284
let stats = stats::get_current_stats(&stream_name, "json")
280285
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;
281286

287+
if CONFIG.parseable.mode == Mode::Query {
288+
let stats = query_server::QueryServer::fetch_stats_from_ingesters(&stream_name).await?;
289+
let stats = serde_json::to_value(stats).unwrap();
290+
return Ok((web::Json(stats), StatusCode::OK));
291+
}
292+
282293
let hash_map = STREAM_INFO.read().unwrap();
283294
let stream_meta = &hash_map
284295
.get(&stream_name)
285296
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;
286297

287298
let time = Utc::now();
299+
let qstats = match &stream_meta.first_event_at {
300+
Some(first_event_at) => {
301+
let ingestion_stats = IngestionStats::new(
302+
stats.events,
303+
format!("{} {}", stats.ingestion, "Bytes"),
304+
"json",
305+
);
306+
let storage_stats =
307+
StorageStats::new(format!("{} {}", stats.storage, "Bytes"), "parquet");
308+
309+
QueriedStats::new(
310+
&stream_name,
311+
&stream_meta.created_at,
312+
Some(first_event_at.to_owned()),
313+
time,
314+
ingestion_stats,
315+
storage_stats,
316+
)
317+
}
288318

289-
let stats = serde_json::json!({
290-
"stream": stream_name,
291-
"creation_time": &stream_meta.created_at,
292-
"first_event_at": Some(&stream_meta.first_event_at),
293-
"time": time,
294-
"ingestion": {
295-
"count": stats.events,
296-
"size": format!("{} {}", stats.ingestion, "Bytes"),
297-
"format": "json"
298-
},
299-
"storage": {
300-
"size": format!("{} {}", stats.storage, "Bytes"),
301-
"format": "parquet"
319+
// ? this case should not happen
320+
None => {
321+
let ingestion_stats = IngestionStats::new(
322+
stats.events,
323+
format!("{} {}", stats.ingestion, "Bytes"),
324+
"json",
325+
);
326+
let storage_stats =
327+
StorageStats::new(format!("{} {}", stats.storage, "Bytes"), "parquet");
328+
329+
QueriedStats::new(
330+
&stream_name,
331+
&stream_meta.created_at,
332+
Some('0'.to_string()),
333+
time,
334+
ingestion_stats,
335+
storage_stats,
336+
)
302337
}
303-
});
338+
};
339+
340+
let out_stats = serde_json::to_value(qstats).unwrap();
304341

305-
Ok((web::Json(stats), StatusCode::OK))
342+
Ok((web::Json(out_stats), StatusCode::OK))
306343
}
307344

308345
// Check if the first_event_at is empty
@@ -345,7 +382,6 @@ pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError>
345382
let created_at = stream_meta.unwrap().created_at;
346383

347384
metadata::STREAM_INFO.add_stream(stream_name.to_string(), created_at);
348-
349385
Ok(())
350386
}
351387

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use crate::analytics;
2020
use crate::banner;
2121
use crate::handlers::http::logstream;
2222
use crate::handlers::http::middleware::RouteExt;
23+
use crate::handlers::http::MAX_EVENT_PAYLOAD_SIZE;
2324
use crate::localcache::LocalCacheManager;
2425
use crate::metadata;
2526
use crate::metrics;
@@ -63,8 +64,8 @@ impl ParseableServer for IngestServer {
6364
prometheus: PrometheusMetrics,
6465
_oidc_client: Option<crate::oidc::OpenidConfig>,
6566
) -> anyhow::Result<()> {
66-
// set the ingestor metadata
67-
self.set_ingestor_metadata().await?;
67+
// set the ingester metadata
68+
self.set_ingester_metadata().await?;
6869

6970
// get the ssl stuff
7071
let ssl = get_ssl_acceptor(
@@ -122,7 +123,9 @@ impl IngestServer {
122123
config
123124
.service(
124125
// Base path "{url}/api/v1"
125-
web::scope(&base_path()).service(Server::get_ingest_factory()),
126+
web::scope(&base_path())
127+
.service(Server::get_ingest_factory())
128+
.service(Self::logstream_api()),
126129
)
127130
.service(Server::get_liveness_factory())
128131
.service(Server::get_readiness_factory())
@@ -151,39 +154,76 @@ impl IngestServer {
151154
}
152155

153156
#[inline(always)]
154-
fn get_ingestor_address(&self) -> SocketAddr {
157+
fn get_ingester_address(&self) -> SocketAddr {
155158
// this might cause an issue down the line
156159
// best is to make the Cli Struct better, but thats a chore
157160
(CONFIG.parseable.address.clone())
158161
.parse::<SocketAddr>()
159162
.unwrap()
160163
}
161164

162-
// create the ingestor metadata and put the .ingestor.json file in the object store
163-
async fn set_ingestor_metadata(&self) -> anyhow::Result<()> {
165+
fn logstream_api() -> Scope {
166+
web::scope("/logstream")
167+
.service(
168+
// GET "/logstream" ==> Get list of all Log Streams on the server
169+
web::resource("")
170+
.route(web::get().to(logstream::list).authorize(Action::ListStream)),
171+
)
172+
.service(
173+
web::scope("/{logstream}")
174+
.service(
175+
web::resource("")
176+
// PUT "/logstream/{logstream}" ==> Create log stream
177+
.route(
178+
web::put()
179+
.to(logstream::put_stream)
180+
.authorize_for_stream(Action::CreateStream),
181+
)
182+
// DELETE "/logstream/{logstream}" ==> Delete log stream
183+
.route(
184+
web::delete()
185+
.to(logstream::delete)
186+
.authorize_for_stream(Action::DeleteStream),
187+
)
188+
.app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)),
189+
)
190+
.service(
191+
// GET "/logstream/{logstream}/stats" ==> Get stats for given log stream
192+
web::resource("/stats").route(
193+
web::get()
194+
.to(logstream::get_stats)
195+
.authorize_for_stream(Action::GetStats),
196+
),
197+
),
198+
)
199+
}
200+
201+
// create the ingester metadata and put the .ingester.json file in the object store
202+
async fn set_ingester_metadata(&self) -> anyhow::Result<()> {
164203
let store = CONFIG.storage().get_object_store();
165204

166205
// remove ip adn go with the domain name
167-
let sock = self.get_ingestor_address();
206+
let sock = self.get_ingester_address();
168207
let path = RelativePathBuf::from(format!(
169-
"ingestor.{}.{}.json",
208+
"ingester.{}.{}.json",
170209
sock.ip(), // this might be wrong
171210
sock.port()
172211
));
173212

174213
if store.get_object(&path).await.is_ok() {
175-
println!("Ingestor metadata already exists");
214+
println!("Ingester metadata already exists");
176215
return Ok(());
177216
};
178217

218+
let scheme = CONFIG.parseable.get_scheme();
179219
let resource = IngesterMetadata::new(
180220
sock.port().to_string(),
181221
CONFIG
182222
.parseable
183223
.domain_address
184224
.clone()
185225
.unwrap_or_else(|| {
186-
Url::parse(&format!("http://{}:{}", sock.ip(), sock.port())).unwrap()
226+
Url::parse(&format!("{}://{}:{}", scheme, sock.ip(), sock.port())).unwrap()
187227
})
188228
.to_string(),
189229
DEFAULT_VERSION.to_string(),
@@ -203,7 +243,7 @@ impl IngestServer {
203243
}
204244

205245
// check for querier state. Is it there, or was it there in the past
206-
// this should happen before the set the ingestor metadata
246+
// this should happen before the set the ingester metadata
207247
async fn check_querier_state(&self) -> anyhow::Result<(), ObjectStorageError> {
208248
// how do we check for querier state?
209249
// based on the work flow of the system, the querier will always need to start first

0 commit comments

Comments
 (0)