Skip to content

Commit 97eff97

Browse files
author
Devdutt Shenoi
committed
refactor: encapsulate create_stream_and_schema_from_storage
1 parent cb8b6c1 commit 97eff97

File tree

8 files changed

+158
-94
lines changed

8 files changed

+158
-94
lines changed

src/handlers/http/ingest.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@ use crate::event::{
2626
error::EventError,
2727
format::{self, EventFormat},
2828
};
29-
use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage;
3029
use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
3130
use crate::metadata::error::stream_info::MetadataError;
3231
use crate::metadata::{SchemaVersion, STREAM_INFO};
3332
use crate::option::{Mode, CONFIG};
3433
use crate::otel::logs::flatten_otel_logs;
3534
use crate::otel::metrics::flatten_otel_metrics;
3635
use crate::otel::traces::flatten_otel_traces;
36+
use crate::parseable::PARSEABLE;
3737
use crate::storage::{ObjectStorageError, StreamType};
3838
use crate::utils::header_parsing::ParseHeaderError;
3939
use crate::utils::json::flatten::JsonFlattenError;
@@ -213,6 +213,7 @@ pub async fn post_event(
213213
Json(json): Json<Value>,
214214
) -> Result<HttpResponse, PostError> {
215215
let stream_name = stream_name.into_inner();
216+
216217
let internal_stream_names = STREAM_INFO.list_internal_streams();
217218
if internal_stream_names.contains(&stream_name) {
218219
return Err(PostError::InternalStream(stream_name));
@@ -222,7 +223,10 @@ pub async fn post_event(
222223
//check if it exists in the storage
223224
//create stream and schema from storage
224225
if CONFIG.options.mode != Mode::All {
225-
match create_stream_and_schema_from_storage(&stream_name).await {
226+
match PARSEABLE
227+
.create_stream_and_schema_from_storage(&stream_name)
228+
.await
229+
{
226230
Ok(true) => {}
227231
Ok(false) | Err(_) => return Err(PostError::StreamNotFound(stream_name.clone())),
228232
}
@@ -277,7 +281,9 @@ pub async fn create_stream_if_not_exists(
277281
//check if it exists in the storage
278282
//create stream and schema from storage
279283
if CONFIG.options.mode != Mode::All
280-
&& create_stream_and_schema_from_storage(stream_name).await?
284+
&& PARSEABLE
285+
.create_stream_and_schema_from_storage(stream_name)
286+
.await?
281287
{
282288
return Ok(stream_exists);
283289
}

src/handlers/http/logstream.rs

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@ use self::error::{CreateStreamError, StreamError};
2020
use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats};
2121
use super::cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME};
2222
use super::ingest::create_stream_if_not_exists;
23-
use super::modal::utils::logstream_utils::{
24-
create_stream_and_schema_from_storage, create_update_stream, update_first_event_at,
25-
};
23+
use super::modal::utils::logstream_utils::{create_update_stream, update_first_event_at};
2624
use super::query::update_schema_when_distributed;
2725
use crate::alerts::Alerts;
2826
use crate::event::format::{override_data_type, LogSource};
@@ -31,6 +29,7 @@ use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
3129
use crate::metadata::{SchemaVersion, STREAM_INFO};
3230
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
3331
use crate::option::{Mode, CONFIG};
32+
use crate::parseable::PARSEABLE;
3433
use crate::rbac::role::Action;
3534
use crate::rbac::Users;
3635
use crate::stats::{event_labels_date, storage_size_labels_date, Stats};
@@ -138,7 +137,10 @@ pub async fn schema(stream_name: Path<String>) -> Result<impl Responder, StreamE
138137
match STREAM_INFO.schema(&stream_name) {
139138
Ok(_) => {}
140139
Err(_) if CONFIG.options.mode == Mode::Query => {
141-
if !create_stream_and_schema_from_storage(&stream_name).await? {
140+
if !PARSEABLE
141+
.create_stream_and_schema_from_storage(&stream_name)
142+
.await?
143+
{
142144
return Err(StreamError::StreamNotFound(stream_name.clone()));
143145
}
144146
}
@@ -225,7 +227,10 @@ pub async fn put_alert(
225227
//check if it exists in the storage
226228
//create stream and schema from storage
227229
if CONFIG.options.mode == Mode::Query {
228-
match create_stream_and_schema_from_storage(&stream_name).await {
230+
match PARSEABLE
231+
.create_stream_and_schema_from_storage(&stream_name)
232+
.await
233+
{
229234
Ok(true) => {}
230235
Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())),
231236
}
@@ -273,7 +278,10 @@ pub async fn get_retention(stream_name: Path<String>) -> Result<impl Responder,
273278
//check if it exists in the storage
274279
//create stream and schema from storage
275280
if CONFIG.options.mode == Mode::Query {
276-
match create_stream_and_schema_from_storage(&stream_name).await {
281+
match PARSEABLE
282+
.create_stream_and_schema_from_storage(&stream_name)
283+
.await
284+
{
277285
Ok(true) => {}
278286
Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())),
279287
}
@@ -306,7 +314,10 @@ pub async fn put_retention(
306314
//check if it exists in the storage
307315
//create stream and schema from storage
308316
if CONFIG.options.mode == Mode::Query {
309-
match create_stream_and_schema_from_storage(&stream_name).await {
317+
match PARSEABLE
318+
.create_stream_and_schema_from_storage(&stream_name)
319+
.await
320+
{
310321
Ok(true) => {}
311322
Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())),
312323
}
@@ -371,7 +382,10 @@ pub async fn get_stats(
371382
//check if it exists in the storage
372383
//create stream and schema from storage
373384
if cfg!(not(test)) && CONFIG.options.mode == Mode::Query {
374-
match create_stream_and_schema_from_storage(&stream_name).await {
385+
match PARSEABLE
386+
.create_stream_and_schema_from_storage(&stream_name)
387+
.await
388+
{
375389
Ok(true) => {}
376390
Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())),
377391
}
@@ -541,7 +555,10 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
541555
let stream_name = stream_name.into_inner();
542556
if !STREAM_INFO.stream_exists(&stream_name) {
543557
if CONFIG.options.mode == Mode::Query {
544-
match create_stream_and_schema_from_storage(&stream_name).await {
558+
match PARSEABLE
559+
.create_stream_and_schema_from_storage(&stream_name)
560+
.await
561+
{
545562
Ok(true) => {}
546563
Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())),
547564
}
@@ -594,7 +611,10 @@ pub async fn put_stream_hot_tier(
594611
//check if it exists in the storage
595612
//create stream and schema from storage
596613
if CONFIG.options.mode == Mode::Query {
597-
match create_stream_and_schema_from_storage(&stream_name).await {
614+
match PARSEABLE
615+
.create_stream_and_schema_from_storage(&stream_name)
616+
.await
617+
{
598618
Ok(true) => {}
599619
Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())),
600620
}
@@ -646,7 +666,10 @@ pub async fn get_stream_hot_tier(stream_name: Path<String>) -> Result<impl Respo
646666
//check if it exists in the storage
647667
//create stream and schema from storage
648668
if CONFIG.options.mode == Mode::Query {
649-
match create_stream_and_schema_from_storage(&stream_name).await {
669+
match PARSEABLE
670+
.create_stream_and_schema_from_storage(&stream_name)
671+
.await
672+
{
650673
Ok(true) => {}
651674
Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())),
652675
}
@@ -673,7 +696,10 @@ pub async fn delete_stream_hot_tier(
673696
//check if it exists in the storage
674697
//create stream and schema from storage
675698
if CONFIG.options.mode == Mode::Query {
676-
match create_stream_and_schema_from_storage(&stream_name).await {
699+
match PARSEABLE
700+
.create_stream_and_schema_from_storage(&stream_name)
701+
.await
702+
{
677703
Ok(true) => {}
678704
Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())),
679705
}

src/handlers/http/modal/ingest/ingestor_logstream.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,11 @@ use crate::{
2828
catalog::remove_manifest_from_snapshot,
2929
event,
3030
handlers::http::{
31-
logstream::error::StreamError,
32-
modal::utils::logstream_utils::{
33-
create_stream_and_schema_from_storage, create_update_stream,
34-
},
31+
logstream::error::StreamError, modal::utils::logstream_utils::create_update_stream,
3532
},
3633
metadata,
3734
option::CONFIG,
35+
parseable::PARSEABLE,
3836
stats,
3937
};
4038

@@ -48,7 +46,8 @@ pub async fn retention_cleanup(
4846
//check if it exists in the storage
4947
//create stream and schema from storage
5048
if !metadata::STREAM_INFO.stream_exists(&stream_name)
51-
&& !create_stream_and_schema_from_storage(&stream_name)
49+
&& !PARSEABLE
50+
.create_stream_and_schema_from_storage(&stream_name)
5251
.await
5352
.unwrap_or(false)
5453
{
@@ -67,7 +66,8 @@ pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamE
6766
//check if it exists in the storage
6867
//create stream and schema from storage
6968
if !metadata::STREAM_INFO.stream_exists(&stream_name)
70-
&& !create_stream_and_schema_from_storage(&stream_name)
69+
&& !PARSEABLE
70+
.create_stream_and_schema_from_storage(&stream_name)
7171
.await
7272
.unwrap_or(false)
7373
{

src/handlers/http/modal/query/querier_logstream.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,12 @@ use crate::{
4141
utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats},
4242
},
4343
logstream::{error::StreamError, get_stats_date},
44-
modal::utils::logstream_utils::{
45-
create_stream_and_schema_from_storage, create_update_stream,
46-
},
44+
modal::utils::logstream_utils::create_update_stream,
4745
},
4846
hottier::HotTierManager,
4947
metadata::{self, STREAM_INFO},
5048
option::CONFIG,
49+
parseable::PARSEABLE,
5150
stats::{self, Stats},
5251
storage::{StorageDir, StreamType},
5352
};
@@ -59,7 +58,8 @@ pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamE
5958
//check if it exists in the storage
6059
//create stream and schema from storage
6160
if !metadata::STREAM_INFO.stream_exists(&stream_name)
62-
&& !create_stream_and_schema_from_storage(&stream_name)
61+
&& !PARSEABLE
62+
.create_stream_and_schema_from_storage(&stream_name)
6363
.await
6464
.unwrap_or(false)
6565
{
@@ -132,7 +132,8 @@ pub async fn get_stats(
132132
//check if it exists in the storage
133133
//create stream and schema from storage
134134
if !metadata::STREAM_INFO.stream_exists(&stream_name)
135-
&& !create_stream_and_schema_from_storage(&stream_name)
135+
&& !PARSEABLE
136+
.create_stream_and_schema_from_storage(&stream_name)
136137
.await
137138
.unwrap_or(false)
138139
{

src/handlers/http/modal/utils/logstream_utils.rs

Lines changed: 5 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,9 @@ use crate::{
3232
},
3333
metadata::{self, SchemaVersion, STREAM_INFO},
3434
option::{Mode, CONFIG},
35+
parseable::PARSEABLE,
3536
static_schema::{convert_static_schema_to_arrow_schema, StaticSchema},
36-
storage::{LogStream, ObjectStoreFormat, StreamType},
37+
storage::StreamType,
3738
validator,
3839
};
3940
use tracing::error;
@@ -64,7 +65,9 @@ pub async fn create_update_stream(
6465

6566
if !metadata::STREAM_INFO.stream_exists(stream_name)
6667
&& CONFIG.options.mode == Mode::Query
67-
&& create_stream_and_schema_from_storage(stream_name).await?
68+
&& PARSEABLE
69+
.create_stream_and_schema_from_storage(stream_name)
70+
.await?
6871
{
6972
return Err(StreamError::Custom {
7073
msg: format!(
@@ -447,69 +450,6 @@ pub async fn create_stream(
447450
Ok(())
448451
}
449452

450-
/// list all streams from storage
451-
/// if stream exists in storage, create stream and schema from storage
452-
/// and add it to the memory map
453-
pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<bool, StreamError> {
454-
// Proceed to create log stream if it doesn't exist
455-
let storage = CONFIG.storage().get_object_store();
456-
let streams = storage.list_streams().await?;
457-
if streams.contains(&LogStream {
458-
name: stream_name.to_owned(),
459-
}) {
460-
let mut stream_metadata = ObjectStoreFormat::default();
461-
let stream_metadata_bytes = storage.create_stream_from_ingestor(stream_name).await?;
462-
if !stream_metadata_bytes.is_empty() {
463-
stream_metadata = serde_json::from_slice::<ObjectStoreFormat>(&stream_metadata_bytes)?;
464-
}
465-
466-
let mut schema = Arc::new(Schema::empty());
467-
let schema_bytes = storage.create_schema_from_ingestor(stream_name).await?;
468-
if !schema_bytes.is_empty() {
469-
schema = serde_json::from_slice::<Arc<Schema>>(&schema_bytes)?;
470-
}
471-
472-
let mut static_schema: HashMap<String, Arc<Field>> = HashMap::new();
473-
474-
for (field_name, field) in schema
475-
.fields()
476-
.iter()
477-
.map(|field| (field.name().to_string(), field.clone()))
478-
{
479-
static_schema.insert(field_name, field);
480-
}
481-
482-
let time_partition = stream_metadata.time_partition.as_deref().unwrap_or("");
483-
let time_partition_limit = stream_metadata
484-
.time_partition_limit
485-
.and_then(|limit| limit.parse().ok());
486-
let custom_partition = stream_metadata.custom_partition.as_deref().unwrap_or("");
487-
let static_schema_flag = stream_metadata.static_schema_flag;
488-
let stream_type = stream_metadata
489-
.stream_type
490-
.map(|s| StreamType::from(s.as_str()))
491-
.unwrap_or_default();
492-
let schema_version = stream_metadata.schema_version;
493-
let log_source = stream_metadata.log_source;
494-
metadata::STREAM_INFO.add_stream(
495-
stream_name.to_string(),
496-
stream_metadata.created_at,
497-
time_partition.to_string(),
498-
time_partition_limit,
499-
custom_partition.to_string(),
500-
static_schema_flag,
501-
static_schema,
502-
stream_type,
503-
schema_version,
504-
log_source,
505-
);
506-
} else {
507-
return Ok(false);
508-
}
509-
510-
Ok(true)
511-
}
512-
513453
/// Updates the first-event-at in storage and logstream metadata for the specified stream.
514454
///
515455
/// This function updates the `first-event-at` in both the object store and the stream info metadata.

src/handlers/http/query.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use crate::metadata::STREAM_INFO;
4040
use crate::event::commit_schema;
4141
use crate::metrics::QUERY_EXECUTE_TIME;
4242
use crate::option::{Mode, CONFIG};
43+
use crate::parseable::PARSEABLE;
4344
use crate::query::error::ExecuteError;
4445
use crate::query::{CountsRequest, CountsResponse, Query as LogicalQuery};
4546
use crate::query::{TableScanVisitor, QUERY_SESSION};
@@ -51,8 +52,6 @@ use crate::utils::actix::extract_session_key_from_req;
5152
use crate::utils::time::{TimeParseError, TimeRange};
5253
use crate::utils::user_auth_for_query;
5354

54-
use super::modal::utils::logstream_utils::create_stream_and_schema_from_storage;
55-
5655
/// Query Request through http endpoint.
5756
#[derive(Debug, Deserialize, Serialize, Clone)]
5857
#[serde(rename_all = "camelCase")]
@@ -195,7 +194,9 @@ pub async fn create_streams_for_querier() {
195194
let stream_name = stream.name;
196195

197196
if !querier_streams.contains(&stream_name) {
198-
let _ = create_stream_and_schema_from_storage(&stream_name).await;
197+
let _ = PARSEABLE
198+
.create_stream_and_schema_from_storage(&stream_name)
199+
.await;
199200
}
200201
}
201202
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pub mod migration;
3939
mod oidc;
4040
pub mod option;
4141
pub mod otel;
42+
mod parseable;
4243
mod query;
4344
pub mod rbac;
4445
mod response;

0 commit comments

Comments
 (0)