Skip to content

Commit 2fac5a9

Browse files
refactor, added comments, added license info
1 parent f537c9f commit 2fac5a9

22 files changed

+359
-26
lines changed

server/src/handlers/http/ingest.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use crate::localcache::CacheError;
3232
use crate::metadata::error::stream_info::MetadataError;
3333
use crate::metadata::STREAM_INFO;
3434
use crate::option::{Mode, CONFIG};
35-
use crate::storage::{LogStream, ObjectStorageError, StreamType};
35+
use crate::storage::{ObjectStorageError, StreamType};
3636
use crate::utils::header_parsing::ParseHeaderError;
3737
use actix_web::{http::header::ContentType, HttpRequest, HttpResponse};
3838
use arrow_array::RecordBatch;
@@ -153,6 +153,9 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result<HttpResponse, P
153153
)));
154154
}
155155
if !STREAM_INFO.stream_exists(&stream_name) {
156+
// For distributed deployments, if the stream not found in memory map,
157+
//check if it exists in the storage
158+
//create stream and schema from storage
156159
if CONFIG.parseable.mode != Mode::All {
157160
match create_stream_and_schema_from_storage(&stream_name).await {
158161
Ok(true) => {}
@@ -198,15 +201,11 @@ pub async fn create_stream_if_not_exists(
198201
return Ok(stream_exists);
199202
}
200203

204+
// For distributed deployments, if the stream not found in memory map,
205+
//check if it exists in the storage
206+
//create stream and schema from storage
201207
if CONFIG.parseable.mode != Mode::All {
202-
let store = CONFIG.storage().get_object_store();
203-
let streams = store.list_streams().await?;
204-
if streams.contains(&LogStream {
205-
name: stream_name.to_owned(),
206-
}) {
207-
create_stream_and_schema_from_storage(stream_name).await?;
208-
return Ok(stream_exists);
209-
}
208+
return Ok(create_stream_and_schema_from_storage(stream_name).await?);
210209
}
211210

212211
super::logstream::create_stream(

server/src/handlers/http/logstream.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
8484
}
8585

8686
pub async fn list(_: HttpRequest) -> impl Responder {
87+
//list all streams from storage
8788
let res = CONFIG
8889
.storage()
8990
.get_object_store()
@@ -118,6 +119,10 @@ pub async fn schema(req: HttpRequest) -> Result<impl Responder, StreamError> {
118119
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
119120
let schema = match STREAM_INFO.schema(&stream_name) {
120121
Ok(schema) => schema,
122+
123+
//if schema not found in memory map
124+
//create stream and schema from storage and memory
125+
//return from memory map
121126
Err(_) if CONFIG.parseable.mode == Mode::Query => {
122127
if create_stream_and_schema_from_storage(&stream_name).await? {
123128
STREAM_INFO.schema(&stream_name)?
@@ -194,6 +199,9 @@ pub async fn put_alert(
194199
validator::alert(&alerts)?;
195200

196201
if !STREAM_INFO.stream_initialized(&stream_name)? {
202+
// For query mode, if the stream not found in memory map,
203+
//check if it exists in the storage
204+
//create stream and schema from storage
197205
if CONFIG.parseable.mode == Mode::Query {
198206
match create_stream_and_schema_from_storage(&stream_name).await {
199207
Ok(true) => {}
@@ -239,6 +247,9 @@ pub async fn put_alert(
239247
pub async fn get_retention(req: HttpRequest) -> Result<impl Responder, StreamError> {
240248
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
241249
if !STREAM_INFO.stream_exists(&stream_name) {
250+
// For query mode, if the stream not found in memory map,
251+
//check if it exists in the storage
252+
//create stream and schema from storage
242253
if CONFIG.parseable.mode == Mode::Query {
243254
match create_stream_and_schema_from_storage(&stream_name).await {
244255
Ok(true) => {}
@@ -267,6 +278,21 @@ pub async fn put_retention(
267278
body: web::Json<serde_json::Value>,
268279
) -> Result<impl Responder, StreamError> {
269280
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
281+
282+
if !STREAM_INFO.stream_exists(&stream_name) {
283+
// For query mode, if the stream not found in memory map,
284+
//check if it exists in the storage
285+
//create stream and schema from storage
286+
if CONFIG.parseable.mode == Mode::Query {
287+
match create_stream_and_schema_from_storage(&stream_name).await {
288+
Ok(true) => {}
289+
Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())),
290+
}
291+
} else {
292+
return Err(StreamError::StreamNotFound(stream_name));
293+
}
294+
}
295+
270296
let body = body.into_inner();
271297

272298
let retention: Retention = match serde_json::from_value(body) {
@@ -356,6 +382,9 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
356382
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
357383

358384
if !STREAM_INFO.stream_exists(&stream_name) {
385+
// For query mode, if the stream not found in memory map,
386+
//check if it exists in the storage
387+
//create stream and schema from storage
359388
if CONFIG.parseable.mode == Mode::Query {
360389
match create_stream_and_schema_from_storage(&stream_name).await {
361390
Ok(true) => {}
@@ -582,6 +611,9 @@ pub async fn put_stream_hot_tier(
582611
) -> Result<impl Responder, StreamError> {
583612
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
584613
if !STREAM_INFO.stream_exists(&stream_name) {
614+
// For query mode, if the stream not found in memory map,
615+
//check if it exists in the storage
616+
//create stream and schema from storage
585617
if CONFIG.parseable.mode == Mode::Query {
586618
match create_stream_and_schema_from_storage(&stream_name).await {
587619
Ok(true) => {}
@@ -639,6 +671,9 @@ pub async fn get_stream_hot_tier(req: HttpRequest) -> Result<impl Responder, Str
639671
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
640672

641673
if !STREAM_INFO.stream_exists(&stream_name) {
674+
// For query mode, if the stream not found in memory map,
675+
//check if it exists in the storage
676+
//create stream and schema from storage
642677
if CONFIG.parseable.mode == Mode::Query {
643678
match create_stream_and_schema_from_storage(&stream_name).await {
644679
Ok(true) => {}
@@ -671,6 +706,9 @@ pub async fn delete_stream_hot_tier(req: HttpRequest) -> Result<impl Responder,
671706
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
672707

673708
if !STREAM_INFO.stream_exists(&stream_name) {
709+
// For query mode, if the stream not found in memory map,
710+
//check if it exists in the storage
711+
//create stream and schema from storage
674712
if CONFIG.parseable.mode == Mode::Query {
675713
match create_stream_and_schema_from_storage(&stream_name).await {
676714
Ok(true) => {}

server/src/handlers/http/modal/ingest/ingester_ingest.rs renamed to server/src/handlers/http/modal/ingest/ingestor_ingest.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,21 @@
1+
/*
2+
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
119
use actix_web::{HttpRequest, HttpResponse};
220
use bytes::Bytes;
321

server/src/handlers/http/modal/ingest/ingester_logstream.rs renamed to server/src/handlers/http/modal/ingest/ingestor_logstream.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,21 @@
1+
/*
2+
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
119
use actix_web::{web, HttpRequest, Responder};
220
use bytes::Bytes;
321
use http::StatusCode;
@@ -24,6 +42,9 @@ pub async fn retention_cleanup(
2442
) -> Result<impl Responder, StreamError> {
2543
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
2644
let storage = CONFIG.storage().get_object_store();
45+
// if the stream not found in memory map,
46+
//check if it exists in the storage
47+
//create stream and schema from storage
2748
if !metadata::STREAM_INFO.stream_exists(&stream_name)
2849
&& !create_stream_and_schema_from_storage(&stream_name)
2950
.await
@@ -41,6 +62,9 @@ pub async fn retention_cleanup(
4162

4263
pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
4364
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
65+
// if the stream not found in memory map,
66+
//check if it exists in the storage
67+
//create stream and schema from storage
4468
if !metadata::STREAM_INFO.stream_exists(&stream_name)
4569
&& !create_stream_and_schema_from_storage(&stream_name)
4670
.await

server/src/handlers/http/modal/ingest/ingester_rbac.rs renamed to server/src/handlers/http/modal/ingest/ingestor_rbac.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,21 @@
1+
/*
2+
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
119
use std::collections::HashSet;
220

321
use actix_web::{web, Responder};

server/src/handlers/http/modal/ingest/ingester_role.rs renamed to server/src/handlers/http/modal/ingest/ingestor_role.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,21 @@
1+
/*
2+
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
119
use actix_web::{web, HttpResponse, Responder};
220
use bytes::Bytes;
321

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,21 @@
1-
pub mod ingester_logstream;
2-
pub mod ingester_rbac;
3-
pub mod ingester_role;
1+
/*
2+
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
19+
pub mod ingestor_logstream;
20+
pub mod ingestor_rbac;
21+
pub mod ingestor_role;

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ use crate::sync;
4040

4141
use std::sync::Arc;
4242

43-
use super::ingest::ingester_logstream;
44-
use super::ingest::ingester_rbac;
45-
use super::ingest::ingester_role;
43+
use super::ingest::ingestor_logstream;
44+
use super::ingest::ingestor_rbac;
45+
use super::ingest::ingestor_role;
4646
use super::server::Server;
4747
use super::ssl_acceptor::get_ssl_acceptor;
4848
use super::IngestorMetadata;
@@ -251,7 +251,7 @@ impl IngestServer {
251251
)
252252
.service(
253253
resource("/{name}/sync")
254-
.route(web::put().to(ingester_role::put).authorize(Action::PutRole)),
254+
.route(web::put().to(ingestor_role::put).authorize(Action::PutRole)),
255255
)
256256
}
257257
// get the user webscope
@@ -262,13 +262,13 @@ impl IngestServer {
262262
// PUT /user/{username}/sync => Sync creation of a new user
263263
.route(
264264
web::post()
265-
.to(ingester_rbac::post_user)
265+
.to(ingestor_rbac::post_user)
266266
.authorize(Action::PutUser),
267267
)
268268
// DELETE /user/{username} => Sync deletion of a user
269269
.route(
270270
web::delete()
271-
.to(ingester_rbac::delete_user)
271+
.to(ingestor_rbac::delete_user)
272272
.authorize(Action::DeleteUser),
273273
)
274274
.wrap(DisAllowRootUser),
@@ -278,7 +278,7 @@ impl IngestServer {
278278
// PUT /user/{username}/roles => Put roles for user
279279
.route(
280280
web::put()
281-
.to(ingester_rbac::put_role)
281+
.to(ingestor_rbac::put_role)
282282
.authorize(Action::PutUserRoles)
283283
.wrap(DisAllowRootUser),
284284
),
@@ -288,7 +288,7 @@ impl IngestServer {
288288
// POST /user/{username}/generate-new-password => reset password for this user
289289
.route(
290290
web::post()
291-
.to(ingester_rbac::post_gen_password)
291+
.to(ingestor_rbac::post_gen_password)
292292
.authorize(Action::PutUser)
293293
.wrap(DisAllowRootUser),
294294
),
@@ -311,13 +311,13 @@ impl IngestServer {
311311
// DELETE "/logstream/{logstream}/sync" ==> Sync deletion of a log stream
312312
.route(
313313
web::delete()
314-
.to(ingester_logstream::delete)
314+
.to(ingestor_logstream::delete)
315315
.authorize(Action::DeleteStream),
316316
)
317317
// PUT "/logstream/{logstream}/sync" ==> Sync creation of a new log stream
318318
.route(
319319
web::put()
320-
.to(ingester_logstream::put_stream)
320+
.to(ingestor_logstream::put_stream)
321321
.authorize_for_stream(Action::CreateStream),
322322
),
323323
)
@@ -342,21 +342,21 @@ impl IngestServer {
342342
// PUT "/logstream/{logstream}/cache" ==> Set retention for given logstream
343343
.route(
344344
web::put()
345-
.to(ingester_logstream::put_enable_cache)
345+
.to(ingestor_logstream::put_enable_cache)
346346
.authorize_for_stream(Action::PutCacheEnabled),
347347
)
348348
// GET "/logstream/{logstream}/cache" ==> Get retention for given logstream
349349
.route(
350350
web::get()
351-
.to(ingester_logstream::get_cache_enabled)
351+
.to(ingestor_logstream::get_cache_enabled)
352352
.authorize_for_stream(Action::GetCacheEnabled),
353353
),
354354
)
355355
.service(
356356
web::scope("/retention").service(
357357
web::resource("/cleanup").route(
358358
web::post()
359-
.to(ingester_logstream::retention_cleanup)
359+
.to(ingestor_logstream::retention_cleanup)
360360
.authorize_for_stream(Action::PutRetention),
361361
),
362362
),

server/src/handlers/http/modal/query/mod.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,21 @@
1+
/*
2+
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
119
pub mod querier_ingest;
220
pub mod querier_logstream;
321
pub mod querier_rbac;

0 commit comments

Comments
 (0)