Skip to content

Commit bc89785

Browse files
updated for distributed fresh deployments
1 parent e99f11b commit bc89785

File tree

9 files changed

+221
-111
lines changed

9 files changed

+221
-111
lines changed

server/src/handlers/http/ingest.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,17 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result<HttpResponse, P
153153
)));
154154
}
155155
if !STREAM_INFO.stream_exists(&stream_name) {
156-
create_stream_and_schema_from_storage(&stream_name).await?;
156+
if CONFIG.parseable.mode != Mode::All {
157+
if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await {
158+
if !stream_found {
159+
return Err(PostError::StreamNotFound(stream_name.clone()));
160+
}
161+
} else {
162+
return Err(PostError::StreamNotFound(stream_name.clone()));
163+
}
164+
} else {
165+
return Err(PostError::StreamNotFound(stream_name.clone()));
166+
}
157167
}
158168

159169
flatten_and_push_logs(req, body, stream_name).await?;

server/src/handlers/http/logstream.rs

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,13 @@ pub async fn put_alert(
184184

185185
if !STREAM_INFO.stream_initialized(&stream_name)? {
186186
if CONFIG.parseable.mode == Mode::Query {
187-
create_stream_and_schema_from_storage(&stream_name).await?;
187+
if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await {
188+
if !stream_found {
189+
return Err(StreamError::StreamNotFound(stream_name.clone()));
190+
}
191+
} else {
192+
return Err(StreamError::StreamNotFound(stream_name.clone()));
193+
}
188194
} else {
189195
return Err(StreamError::UninitializedLogstream);
190196
}
@@ -226,7 +232,13 @@ pub async fn get_retention(req: HttpRequest) -> Result<impl Responder, StreamErr
226232
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
227233
if !STREAM_INFO.stream_exists(&stream_name) {
228234
if CONFIG.parseable.mode == Mode::Query {
229-
create_stream_and_schema_from_storage(&stream_name).await?;
235+
if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await {
236+
if !stream_found {
237+
return Err(StreamError::StreamNotFound(stream_name.clone()));
238+
}
239+
} else {
240+
return Err(StreamError::StreamNotFound(stream_name.clone()));
241+
}
230242
} else {
231243
return Err(StreamError::StreamNotFound(stream_name));
232244
}
@@ -339,8 +351,14 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
339351
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
340352

341353
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
342-
if CONFIG.parseable.mode == Mode::Query {
343-
create_stream_and_schema_from_storage(&stream_name).await?;
354+
if CONFIG.parseable.mode != Mode::All {
355+
if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await {
356+
if !stream_found {
357+
return Err(StreamError::StreamNotFound(stream_name.clone()));
358+
}
359+
} else {
360+
return Err(StreamError::StreamNotFound(stream_name.clone()));
361+
}
344362
} else {
345363
return Err(StreamError::StreamNotFound(stream_name));
346364
}
@@ -513,7 +531,11 @@ pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamE
513531
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
514532
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
515533
if CONFIG.parseable.mode == Mode::Query {
516-
create_stream_and_schema_from_storage(&stream_name).await?;
534+
if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await {
535+
if !stream_found {
536+
return Err(StreamError::StreamNotFound(stream_name.clone()));
537+
}
538+
}
517539
} else {
518540
return Err(StreamError::StreamNotFound(stream_name));
519541
}
@@ -560,7 +582,11 @@ pub async fn put_stream_hot_tier(
560582
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
561583
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
562584
if CONFIG.parseable.mode == Mode::Query {
563-
create_stream_and_schema_from_storage(&stream_name).await?;
585+
if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await {
586+
if !stream_found {
587+
return Err(StreamError::StreamNotFound(stream_name.clone()));
588+
}
589+
}
564590
} else {
565591
return Err(StreamError::StreamNotFound(stream_name));
566592
}
@@ -614,7 +640,11 @@ pub async fn get_stream_hot_tier(req: HttpRequest) -> Result<impl Responder, Str
614640

615641
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
616642
if CONFIG.parseable.mode == Mode::Query {
617-
create_stream_and_schema_from_storage(&stream_name).await?;
643+
if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await {
644+
if !stream_found {
645+
return Err(StreamError::StreamNotFound(stream_name.clone()));
646+
}
647+
}
618648
} else {
619649
return Err(StreamError::StreamNotFound(stream_name));
620650
}
@@ -643,7 +673,11 @@ pub async fn delete_stream_hot_tier(req: HttpRequest) -> Result<impl Responder,
643673

644674
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
645675
if CONFIG.parseable.mode == Mode::Query {
646-
create_stream_and_schema_from_storage(&stream_name).await?;
676+
if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await {
677+
if !stream_found {
678+
return Err(StreamError::StreamNotFound(stream_name.clone()));
679+
}
680+
}
647681
} else {
648682
return Err(StreamError::StreamNotFound(stream_name));
649683
}

server/src/handlers/http/modal/ingest/ingester_logstream.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,24 +25,27 @@ pub async fn retention_cleanup(
2525
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
2626
let storage = CONFIG.storage().get_object_store();
2727
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
28-
create_stream_and_schema_from_storage(&stream_name).await?;
28+
if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await {
29+
if !stream_found {
30+
return Err(StreamError::StreamNotFound(stream_name.clone()));
31+
}
32+
}
2933
}
3034
let date_list: Vec<String> = serde_json::from_slice(&body).unwrap();
3135
let res = remove_manifest_from_snapshot(storage.clone(), &stream_name, date_list).await;
32-
let mut first_event_at: Option<String> = None;
33-
if let Err(err) = res {
34-
log::error!("Failed to update manifest list in the snapshot {err:?}")
35-
} else {
36-
first_event_at = res.unwrap();
37-
}
36+
let first_event_at: Option<String> = res.unwrap_or_default();
3837

3938
Ok((first_event_at, StatusCode::OK))
4039
}
4140

4241
pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
4342
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
4443
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
45-
create_stream_and_schema_from_storage(&stream_name).await?;
44+
if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await {
45+
if !stream_found {
46+
return Err(StreamError::StreamNotFound(stream_name.clone()));
47+
}
48+
}
4649
}
4750

4851
metadata::STREAM_INFO.delete_stream(&stream_name);

server/src/handlers/http/modal/query/querier_logstream.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,11 @@ use crate::{
3333
pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
3434
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
3535
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
36-
create_stream_and_schema_from_storage(&stream_name).await?;
36+
if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await {
37+
if !stream_found {
38+
return Err(StreamError::StreamNotFound(stream_name.clone()));
39+
}
40+
}
3741
}
3842

3943
let objectstore = CONFIG.storage().get_object_store();
@@ -95,7 +99,13 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
9599
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
96100

97101
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
98-
create_stream_and_schema_from_storage(&stream_name).await?;
102+
if let Ok(stream_found) = create_stream_and_schema_from_storage(&stream_name).await {
103+
if !stream_found {
104+
return Err(StreamError::StreamNotFound(stream_name));
105+
}
106+
} else {
107+
return Err(StreamError::StreamNotFound(stream_name));
108+
}
99109
}
100110

101111
let query_string = req.query_string();

server/src/handlers/http/modal/utils/logstream_utils.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,16 @@ pub async fn create_update_stream(
4747
});
4848
}
4949

50-
if CONFIG.parseable.mode == Mode::Query {
51-
create_stream_and_schema_from_storage(stream_name).await?;
50+
if !metadata::STREAM_INFO.stream_exists(stream_name)
51+
&& CONFIG.parseable.mode == Mode::Query
52+
&& create_stream_and_schema_from_storage(stream_name).await?
53+
{
5254
return Err(StreamError::Custom {
53-
msg: format!(
54-
"Logstream {stream_name} already exists, please create a new log stream with unique name"
55-
),
56-
status: StatusCode::BAD_REQUEST,
57-
});
55+
msg: format!(
56+
"Logstream {stream_name} already exists, please create a new log stream with unique name"
57+
),
58+
status: StatusCode::BAD_REQUEST,
59+
});
5860
}
5961

6062
if update_stream == "true" {
@@ -316,7 +318,6 @@ pub async fn update_custom_partition_in_stream(
316318
}
317319
}
318320
}
319-
320321
let storage = CONFIG.storage().get_object_store();
321322
if let Err(err) = storage
322323
.update_custom_partition_in_stream(&stream_name, custom_partition)
@@ -395,7 +396,7 @@ pub async fn create_stream(
395396
Ok(())
396397
}
397398

398-
pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<(), StreamError> {
399+
pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<bool, StreamError> {
399400
// Proceed to create log stream if it doesn't exist
400401
let storage = CONFIG.storage().get_object_store();
401402
let streams = storage.list_streams().await?;
@@ -495,8 +496,8 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<
495496
stream_type,
496497
);
497498
} else {
498-
return Err(StreamError::StreamNotFound(stream_name.to_string()));
499+
return Ok(false);
499500
}
500501

501-
Ok(())
502+
Ok(true)
502503
}

server/src/metadata.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use crate::metrics::{
3232
EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED,
3333
LIFETIME_EVENTS_INGESTED_SIZE,
3434
};
35+
use crate::option::{Mode, CONFIG};
3536
use crate::storage::retention::Retention;
3637
use crate::storage::{LogStream, ObjectStorage, ObjectStoreFormat, StorageDir, StreamType};
3738
use crate::utils::arrow::MergedRecordReader;
@@ -462,7 +463,9 @@ pub async fn load_stream_metadata_on_server_start(
462463
}
463464
let schema =
464465
update_data_type_time_partition(storage, stream_name, schema, meta.clone()).await?;
465-
466+
if CONFIG.parseable.mode == Mode::Ingest {
467+
storage.put_schema(stream_name, &schema).await?;
468+
}
466469
//load stats from storage
467470
let stats = meta.stats;
468471
fetch_stats_from_storage(stream_name, stats).await;

server/src/migration.rs

Lines changed: 17 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,13 @@ mod stream_metadata_migration;
2424
use std::{fs::OpenOptions, sync::Arc};
2525

2626
use crate::{
27-
catalog::snapshot::Snapshot,
2827
hottier::{HotTierManager, CURRENT_HOT_TIER_VERSION},
2928
metadata::load_stream_metadata_on_server_start,
3029
option::{validation::human_size_to_bytes, Config, Mode, CONFIG},
31-
stats::FullStats,
3230
storage::{
33-
object_storage::{parseable_json_path, schema_path, stream_json_path},
34-
ObjectStorage, ObjectStorageError, ObjectStoreFormat, PARSEABLE_METADATA_FILE_NAME,
35-
PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_ROOT_DIRECTORY,
31+
object_storage::{parseable_json_path, stream_json_path},
32+
ObjectStorage, ObjectStorageError, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY,
33+
SCHEMA_FILE_NAME, STREAM_ROOT_DIRECTORY,
3634
},
3735
};
3836
use arrow_schema::Schema;
@@ -165,69 +163,30 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::
165163
let schema = if let Ok(schema) = storage.get_object(&query_schema_path).await {
166164
schema
167165
} else {
168-
let path = RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY]);
169-
let schema_obs = storage
170-
.get_objects(
171-
Some(&path),
172-
Box::new(|file_name| {
173-
file_name.starts_with(".ingestor") && file_name.ends_with("schema")
174-
}),
175-
)
166+
storage
167+
.create_schema_from_ingestor(stream)
176168
.await
177-
.into_iter()
178-
.next();
179-
if let Some(schema_obs) = schema_obs {
180-
let schema_ob = &schema_obs[0];
181-
storage
182-
.put_object(&schema_path(stream), schema_ob.clone())
183-
.await?;
184-
schema_ob.clone()
185-
} else {
186-
Bytes::new()
187-
}
169+
.unwrap_or_default()
188170
};
189171

190172
let path = stream_json_path(stream);
191173
let stream_metadata = if let Ok(stream_metadata) = storage.get_object(&path).await {
192174
stream_metadata
193-
} else if CONFIG.parseable.mode != Mode::All {
194-
let path = RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY]);
195-
let stream_metadata_obs = storage
196-
.get_objects(
197-
Some(&path),
198-
Box::new(|file_name| {
199-
file_name.starts_with(".ingestor") && file_name.ends_with("stream.json")
200-
}),
201-
)
175+
} else {
176+
let querier_stream = storage
177+
.create_stream_from_querier(stream)
202178
.await
203-
.into_iter()
204-
.next();
205-
if let Some(stream_metadata_obs) = stream_metadata_obs {
206-
if !stream_metadata_obs.is_empty() {
207-
let stream_metadata_bytes = &stream_metadata_obs[0];
208-
let stream_ob_metdata =
209-
serde_json::from_slice::<ObjectStoreFormat>(stream_metadata_bytes)?;
210-
let stream_metadata = ObjectStoreFormat {
211-
stats: FullStats::default(),
212-
snapshot: Snapshot::default(),
213-
..stream_ob_metdata
214-
};
215-
216-
let stream_metadata_bytes: Bytes = serde_json::to_vec(&stream_metadata)?.into();
217-
storage
218-
.put_object(&stream_json_path(stream), stream_metadata_bytes.clone())
219-
.await?;
220-
221-
stream_metadata_bytes
222-
} else {
223-
Bytes::new()
224-
}
179+
.unwrap_or_default();
180+
if !querier_stream.is_empty() {
181+
querier_stream
225182
} else {
226-
Bytes::new()
183+
storage
184+
.create_stream_from_ingestor(stream)
185+
.await
186+
.unwrap_or_default()
227187
}
228-
} else {
229-
Bytes::new()
230188
};
189+
231190
let mut stream_meta_found = true;
232191
if stream_metadata.is_empty() {
233192
if CONFIG.parseable.mode != Mode::Ingest {

0 commit comments

Comments
 (0)