Skip to content

Commit f537c9f

Browse files
code refactor
1 parent c66f09d commit f537c9f

File tree

8 files changed

+101
-176
lines changed

8 files changed

+101
-176
lines changed

server/src/handlers/http/ingest.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -154,12 +154,9 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result<HttpResponse, P
154154
}
155155
if !STREAM_INFO.stream_exists(&stream_name) {
156156
if CONFIG.parseable.mode != Mode::All {
157-
if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await {
158-
if !stream_found {
159-
return Err(PostError::StreamNotFound(stream_name.clone()));
160-
}
161-
} else {
162-
return Err(PostError::StreamNotFound(stream_name.clone()));
157+
match create_stream_and_schema_from_storage(&stream_name).await {
158+
Ok(true) => {}
159+
Ok(false) | Err(_) => return Err(PostError::StreamNotFound(stream_name.clone())),
163160
}
164161
} else {
165162
return Err(PostError::StreamNotFound(stream_name.clone()));

server/src/handlers/http/logstream.rs

Lines changed: 36 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -116,17 +116,16 @@ pub async fn detect_schema(body: Bytes) -> Result<impl Responder, StreamError> {
116116

117117
pub async fn schema(req: HttpRequest) -> Result<impl Responder, StreamError> {
118118
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
119-
let schema = if let Ok(schema) = STREAM_INFO.schema(&stream_name) {
120-
schema
121-
} else if CONFIG.parseable.mode == Mode::Query {
122-
let stream_found = create_stream_and_schema_from_storage(&stream_name).await?;
123-
if !stream_found {
124-
return Err(StreamError::StreamNotFound(stream_name.clone()));
125-
} else {
126-
STREAM_INFO.schema(&stream_name)?
119+
let schema = match STREAM_INFO.schema(&stream_name) {
120+
Ok(schema) => schema,
121+
Err(_) if CONFIG.parseable.mode == Mode::Query => {
122+
if create_stream_and_schema_from_storage(&stream_name).await? {
123+
STREAM_INFO.schema(&stream_name)?
124+
} else {
125+
return Err(StreamError::StreamNotFound(stream_name.clone()));
126+
}
127127
}
128-
} else {
129-
return Err(StreamError::StreamNotFound(stream_name));
128+
Err(_) => return Err(StreamError::StreamNotFound(stream_name)),
130129
};
131130

132131
Ok((web::Json(schema), StatusCode::OK))
@@ -196,12 +195,9 @@ pub async fn put_alert(
196195

197196
if !STREAM_INFO.stream_initialized(&stream_name)? {
198197
if CONFIG.parseable.mode == Mode::Query {
199-
if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await {
200-
if !stream_found {
201-
return Err(StreamError::StreamNotFound(stream_name.clone()));
202-
}
203-
} else {
204-
return Err(StreamError::StreamNotFound(stream_name.clone()));
198+
match create_stream_and_schema_from_storage(&stream_name).await {
199+
Ok(true) => {}
200+
Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())),
205201
}
206202
} else {
207203
return Err(StreamError::UninitializedLogstream);
@@ -244,12 +240,9 @@ pub async fn get_retention(req: HttpRequest) -> Result<impl Responder, StreamErr
244240
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
245241
if !STREAM_INFO.stream_exists(&stream_name) {
246242
if CONFIG.parseable.mode == Mode::Query {
247-
if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await {
248-
if !stream_found {
249-
return Err(StreamError::StreamNotFound(stream_name.clone()));
250-
}
251-
} else {
252-
return Err(StreamError::StreamNotFound(stream_name.clone()));
243+
match create_stream_and_schema_from_storage(&stream_name).await {
244+
Ok(true) => {}
245+
Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())),
253246
}
254247
} else {
255248
return Err(StreamError::StreamNotFound(stream_name));
@@ -362,14 +355,11 @@ pub async fn get_stats_date(stream_name: &str, date: &str) -> Result<Stats, Stre
362355
pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError> {
363356
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
364357

365-
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
366-
if CONFIG.parseable.mode != Mode::All {
367-
if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await {
368-
if !stream_found {
369-
return Err(StreamError::StreamNotFound(stream_name.clone()));
370-
}
371-
} else {
372-
return Err(StreamError::StreamNotFound(stream_name.clone()));
358+
if !STREAM_INFO.stream_exists(&stream_name) {
359+
if CONFIG.parseable.mode == Mode::Query {
360+
match create_stream_and_schema_from_storage(&stream_name).await {
361+
Ok(true) => {}
362+
Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())),
373363
}
374364
} else {
375365
return Err(StreamError::StreamNotFound(stream_name));
@@ -541,12 +531,11 @@ pub async fn create_stream(
541531

542532
pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamError> {
543533
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
544-
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
534+
if !STREAM_INFO.stream_exists(&stream_name) {
545535
if CONFIG.parseable.mode == Mode::Query {
546-
if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await {
547-
if !stream_found {
548-
return Err(StreamError::StreamNotFound(stream_name.clone()));
549-
}
536+
match create_stream_and_schema_from_storage(&stream_name).await {
537+
Ok(true) => {}
538+
Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())),
550539
}
551540
} else {
552541
return Err(StreamError::StreamNotFound(stream_name));
@@ -592,12 +581,11 @@ pub async fn put_stream_hot_tier(
592581
body: web::Json<serde_json::Value>,
593582
) -> Result<impl Responder, StreamError> {
594583
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
595-
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
584+
if !STREAM_INFO.stream_exists(&stream_name) {
596585
if CONFIG.parseable.mode == Mode::Query {
597-
if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await {
598-
if !stream_found {
599-
return Err(StreamError::StreamNotFound(stream_name.clone()));
600-
}
586+
match create_stream_and_schema_from_storage(&stream_name).await {
587+
Ok(true) => {}
588+
Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())),
601589
}
602590
} else {
603591
return Err(StreamError::StreamNotFound(stream_name));
@@ -650,12 +638,11 @@ pub async fn put_stream_hot_tier(
650638
pub async fn get_stream_hot_tier(req: HttpRequest) -> Result<impl Responder, StreamError> {
651639
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
652640

653-
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
641+
if !STREAM_INFO.stream_exists(&stream_name) {
654642
if CONFIG.parseable.mode == Mode::Query {
655-
if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await {
656-
if !stream_found {
657-
return Err(StreamError::StreamNotFound(stream_name.clone()));
658-
}
643+
match create_stream_and_schema_from_storage(&stream_name).await {
644+
Ok(true) => {}
645+
Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())),
659646
}
660647
} else {
661648
return Err(StreamError::StreamNotFound(stream_name));
@@ -683,12 +670,11 @@ pub async fn get_stream_hot_tier(req: HttpRequest) -> Result<impl Responder, Str
683670
pub async fn delete_stream_hot_tier(req: HttpRequest) -> Result<impl Responder, StreamError> {
684671
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
685672

686-
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
673+
if !STREAM_INFO.stream_exists(&stream_name) {
687674
if CONFIG.parseable.mode == Mode::Query {
688-
if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await {
689-
if !stream_found {
690-
return Err(StreamError::StreamNotFound(stream_name.clone()));
691-
}
675+
match create_stream_and_schema_from_storage(&stream_name).await {
676+
Ok(true) => {}
677+
Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())),
692678
}
693679
} else {
694680
return Err(StreamError::StreamNotFound(stream_name));

server/src/handlers/http/modal/ingest/ingester_logstream.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,14 @@ pub async fn retention_cleanup(
2424
) -> Result<impl Responder, StreamError> {
2525
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
2626
let storage = CONFIG.storage().get_object_store();
27-
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
28-
if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await {
29-
if !stream_found {
30-
return Err(StreamError::StreamNotFound(stream_name.clone()));
31-
}
32-
}
27+
if !metadata::STREAM_INFO.stream_exists(&stream_name)
28+
&& !create_stream_and_schema_from_storage(&stream_name)
29+
.await
30+
.unwrap_or(false)
31+
{
32+
return Err(StreamError::StreamNotFound(stream_name.clone()));
3333
}
34+
3435
let date_list: Vec<String> = serde_json::from_slice(&body).unwrap();
3536
let res = remove_manifest_from_snapshot(storage.clone(), &stream_name, date_list).await;
3637
let first_event_at: Option<String> = res.unwrap_or_default();
@@ -40,12 +41,12 @@ pub async fn retention_cleanup(
4041

4142
pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
4243
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
43-
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
44-
if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await {
45-
if !stream_found {
46-
return Err(StreamError::StreamNotFound(stream_name.clone()));
47-
}
48-
}
44+
if !metadata::STREAM_INFO.stream_exists(&stream_name)
45+
&& !create_stream_and_schema_from_storage(&stream_name)
46+
.await
47+
.unwrap_or(false)
48+
{
49+
return Err(StreamError::StreamNotFound(stream_name.clone()));
4950
}
5051

5152
metadata::STREAM_INFO.delete_stream(&stream_name);

server/src/handlers/http/modal/query/querier_logstream.rs

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,12 @@ use crate::{
3232

3333
pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
3434
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
35-
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
36-
if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await {
37-
if !stream_found {
38-
return Err(StreamError::StreamNotFound(stream_name.clone()));
39-
}
40-
}
35+
if !metadata::STREAM_INFO.stream_exists(&stream_name)
36+
&& !create_stream_and_schema_from_storage(&stream_name)
37+
.await
38+
.unwrap_or(false)
39+
{
40+
return Err(StreamError::StreamNotFound(stream_name.clone()));
4141
}
4242

4343
let objectstore = CONFIG.storage().get_object_store();
@@ -98,14 +98,12 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
9898
pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError> {
9999
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
100100

101-
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
102-
if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await {
103-
if !stream_found {
104-
return Err(StreamError::StreamNotFound(stream_name));
105-
}
106-
} else {
107-
return Err(StreamError::StreamNotFound(stream_name));
108-
}
101+
if !metadata::STREAM_INFO.stream_exists(&stream_name)
102+
&& !create_stream_and_schema_from_storage(&stream_name)
103+
.await
104+
.unwrap_or(false)
105+
{
106+
return Err(StreamError::StreamNotFound(stream_name.clone()));
109107
}
110108

111109
let query_string = req.query_string();

server/src/handlers/http/modal/utils/logstream_utils.rs

Lines changed: 15 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,8 @@ use actix_web::{http::header::HeaderMap, HttpRequest};
44
use arrow_schema::{Field, Schema};
55
use bytes::Bytes;
66
use http::StatusCode;
7-
use relative_path::RelativePathBuf;
87

98
use crate::{
10-
catalog::snapshot::Snapshot,
119
handlers::{
1210
http::logstream::error::{CreateStreamError, StreamError},
1311
CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TIME_PARTITION_KEY,
@@ -16,11 +14,7 @@ use crate::{
1614
metadata::{self, STREAM_INFO},
1715
option::{Mode, CONFIG},
1816
static_schema::{convert_static_schema_to_arrow_schema, StaticSchema},
19-
stats::FullStats,
20-
storage::{
21-
object_storage::{schema_path, stream_json_path},
22-
LogStream, ObjectStoreFormat, StreamType, STREAM_ROOT_DIRECTORY,
23-
},
17+
storage::{LogStream, ObjectStoreFormat, StreamType},
2418
validator,
2519
};
2620

@@ -404,49 +398,15 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<
404398
name: stream_name.to_owned(),
405399
}) {
406400
let mut stream_metadata = ObjectStoreFormat::default();
407-
let path = RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY]);
408-
let stream_obs = storage
409-
.get_objects(
410-
Some(&path),
411-
Box::new(|file_name| {
412-
file_name.starts_with(".ingestor") && file_name.ends_with("stream.json")
413-
}),
414-
)
415-
.await
416-
.into_iter()
417-
.next();
418-
if let Some(stream_obs) = stream_obs {
419-
let stream_ob = &stream_obs[0];
420-
let stream_ob_metdata = serde_json::from_slice::<ObjectStoreFormat>(stream_ob)?;
421-
stream_metadata = ObjectStoreFormat {
422-
stats: FullStats::default(),
423-
snapshot: Snapshot::default(),
424-
..stream_ob_metdata
425-
};
426-
427-
let stream_metadata_bytes = serde_json::to_vec(&stream_metadata)?.into();
428-
storage
429-
.put_object(&stream_json_path(stream_name), stream_metadata_bytes)
430-
.await?;
401+
let stream_metadata_bytes = storage.create_stream_from_ingestor(stream_name).await?;
402+
if !stream_metadata_bytes.is_empty() {
403+
stream_metadata = serde_json::from_slice::<ObjectStoreFormat>(&stream_metadata_bytes)?;
431404
}
432405

433406
let mut schema = Arc::new(Schema::empty());
434-
let schema_obs = storage
435-
.get_objects(
436-
Some(&path),
437-
Box::new(|file_name| {
438-
file_name.starts_with(".ingestor") && file_name.ends_with("schema")
439-
}),
440-
)
441-
.await
442-
.into_iter()
443-
.next();
444-
if let Some(schema_obs) = schema_obs {
445-
let schema_ob = &schema_obs[0];
446-
storage
447-
.put_object(&schema_path(stream_name), schema_ob.clone())
448-
.await?;
449-
schema = serde_json::from_slice::<Arc<Schema>>(schema_ob)?;
407+
let schema_bytes = storage.create_schema_from_ingestor(stream_name).await?;
408+
if !schema_bytes.is_empty() {
409+
schema = serde_json::from_slice::<Arc<Schema>>(&schema_bytes)?;
450410
}
451411

452412
let mut static_schema: HashMap<String, Arc<Field>> = HashMap::new();
@@ -459,31 +419,14 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<
459419
static_schema.insert(field_name, field);
460420
}
461421

462-
let time_partition = if stream_metadata.time_partition.is_some() {
463-
stream_metadata.time_partition.as_ref().unwrap()
464-
} else {
465-
""
466-
};
467-
let time_partition_limit = if stream_metadata.time_partition_limit.is_some() {
468-
stream_metadata.time_partition_limit.as_ref().unwrap()
469-
} else {
470-
""
471-
};
472-
let custom_partition = if stream_metadata.custom_partition.is_some() {
473-
stream_metadata.custom_partition.as_ref().unwrap()
474-
} else {
475-
""
476-
};
477-
let static_schema_flag = if stream_metadata.static_schema_flag.is_some() {
478-
stream_metadata.static_schema_flag.as_ref().unwrap()
479-
} else {
480-
""
481-
};
482-
let stream_type = if stream_metadata.stream_type.is_some() {
483-
stream_metadata.stream_type.as_ref().unwrap()
484-
} else {
485-
""
486-
};
422+
let time_partition = stream_metadata.time_partition.as_deref().unwrap_or("");
423+
let time_partition_limit = stream_metadata
424+
.time_partition_limit
425+
.as_deref()
426+
.unwrap_or("");
427+
let custom_partition = stream_metadata.custom_partition.as_deref().unwrap_or("");
428+
let static_schema_flag = stream_metadata.static_schema_flag.as_deref().unwrap_or("");
429+
let stream_type = stream_metadata.stream_type.as_deref().unwrap_or("");
487430

488431
metadata::STREAM_INFO.add_stream(
489432
stream_name.to_string(),

server/src/storage/azure_blob.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -273,23 +273,22 @@ impl BlobStore {
273273
.iter()
274274
.flat_map(|path| path.parts())
275275
.map(|name| name.as_ref().to_string())
276-
.filter(|name| name != PARSEABLE_ROOT_DIRECTORY)
277-
.filter(|name| name != USERS_ROOT_DIR)
276+
.filter(|name| name != PARSEABLE_ROOT_DIRECTORY && name != USERS_ROOT_DIR)
278277
.collect::<Vec<_>>();
278+
279279
for stream in streams {
280280
let stream_path =
281281
object_store::path::Path::from(format!("{}/{}", &stream, STREAM_ROOT_DIRECTORY));
282282
let resp = self.client.list_with_delimiter(Some(&stream_path)).await?;
283-
let stream_files: Vec<String> = resp
283+
if resp
284284
.objects
285285
.iter()
286-
.filter(|name| name.location.filename().unwrap().ends_with("stream.json"))
287-
.map(|name| name.location.to_string())
288-
.collect();
289-
if !stream_files.is_empty() {
286+
.any(|name| name.location.filename().unwrap().ends_with("stream.json"))
287+
{
290288
result_file_list.push(LogStream { name: stream });
291289
}
292290
}
291+
293292
Ok(result_file_list)
294293
}
295294

0 commit comments

Comments
 (0)