Skip to content

Commit c66f09d

Browse files
bug fix for schema sync at server restart
1 parent 8c24d0d commit c66f09d

File tree

4 files changed

+45
-15
lines changed

4 files changed

+45
-15
lines changed

server/src/handlers/http/logstream.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,19 @@ pub async fn detect_schema(body: Bytes) -> Result<impl Responder, StreamError> {
116116

117117
pub async fn schema(req: HttpRequest) -> Result<impl Responder, StreamError> {
118118
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
119-
let schema = STREAM_INFO.schema(&stream_name)?;
119+
let schema = if let Ok(schema) = STREAM_INFO.schema(&stream_name) {
120+
schema
121+
} else if CONFIG.parseable.mode == Mode::Query {
122+
let stream_found = create_stream_and_schema_from_storage(&stream_name).await?;
123+
if !stream_found {
124+
return Err(StreamError::StreamNotFound(stream_name.clone()));
125+
} else {
126+
STREAM_INFO.schema(&stream_name)?
127+
}
128+
} else {
129+
return Err(StreamError::StreamNotFound(stream_name));
130+
};
131+
120132
Ok((web::Json(schema), StatusCode::OK))
121133
}
122134

server/src/metadata.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ use crate::metrics::{
3232
EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED,
3333
LIFETIME_EVENTS_INGESTED_SIZE,
3434
};
35-
use crate::option::{Mode, CONFIG};
3635
use crate::storage::retention::Retention;
3736
use crate::storage::{LogStream, ObjectStorage, ObjectStoreFormat, StorageDir, StreamType};
3837
use crate::utils::arrow::MergedRecordReader;
@@ -463,9 +462,7 @@ pub async fn load_stream_metadata_on_server_start(
463462
}
464463
let schema =
465464
update_data_type_time_partition(storage, stream_name, schema, meta.clone()).await?;
466-
if CONFIG.parseable.mode == Mode::Ingest {
467-
storage.put_schema(stream_name, &schema).await?;
468-
}
465+
storage.put_schema(stream_name, &schema).await?;
469466
//load stats from storage
470467
let stats = meta.stats;
471468
fetch_stats_from_storage(stream_name, stats).await;

server/src/migration.rs

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ use crate::{
2828
metadata::load_stream_metadata_on_server_start,
2929
option::{validation::human_size_to_bytes, Config, Mode, CONFIG},
3030
storage::{
31-
object_storage::{parseable_json_path, stream_json_path},
31+
object_storage::{parseable_json_path, schema_path, stream_json_path},
3232
ObjectStorage, ObjectStorageError, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY,
33-
SCHEMA_FILE_NAME, STREAM_ROOT_DIRECTORY,
33+
STREAM_ROOT_DIRECTORY,
3434
},
3535
};
3636
use arrow_schema::Schema;
@@ -166,15 +166,22 @@ async fn migration_hot_tier(stream: &str) -> anyhow::Result<()> {
166166

167167
async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::Result<()> {
168168
let mut arrow_schema: Schema = Schema::empty();
169-
let query_schema_path =
170-
RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]);
171-
let schema = if let Ok(schema) = storage.get_object(&query_schema_path).await {
169+
let schema_path = schema_path(stream);
170+
let schema = if let Ok(schema) = storage.get_object(&schema_path).await {
172171
schema
173172
} else {
174-
storage
175-
.create_schema_from_ingestor(stream)
173+
let querier_schema = storage
174+
.create_schema_from_querier(stream)
176175
.await
177-
.unwrap_or_default()
176+
.unwrap_or_default();
177+
if !querier_schema.is_empty() {
178+
querier_schema
179+
} else {
180+
storage
181+
.create_schema_from_ingestor(stream)
182+
.await
183+
.unwrap_or_default()
184+
}
178185
};
179186

180187
let path = stream_json_path(stream);
@@ -222,7 +229,7 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::
222229
let schema = serde_json::from_slice(&schema).ok();
223230
arrow_schema = schema_migration::v1_v4(schema)?;
224231
storage
225-
.put_object(&query_schema_path, to_bytes(&arrow_schema))
232+
.put_object(&schema_path, to_bytes(&arrow_schema))
226233
.await?;
227234
}
228235
Some("v2") => {
@@ -236,7 +243,7 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::
236243
let schema = serde_json::from_slice(&schema)?;
237244
arrow_schema = schema_migration::v2_v4(schema)?;
238245
storage
239-
.put_object(&query_schema_path, to_bytes(&arrow_schema))
246+
.put_object(&schema_path, to_bytes(&arrow_schema))
240247
.await?;
241248
}
242249
Some("v3") => {

server/src/storage/object_storage.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,20 @@ pub trait ObjectStorage: Sync + 'static {
493493
Ok(Bytes::new())
494494
}
495495

496+
async fn create_schema_from_querier(
497+
&self,
498+
stream_name: &str,
499+
) -> Result<Bytes, ObjectStorageError> {
500+
let path =
501+
RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]);
502+
if let Ok(querier_schema_bytes) = self.get_object(&path).await {
503+
self.put_object(&schema_path(stream_name), querier_schema_bytes.clone())
504+
.await?;
505+
return Ok(querier_schema_bytes);
506+
}
507+
Ok(Bytes::new())
508+
}
509+
496510
async fn create_schema_from_ingestor(
497511
&self,
498512
stream_name: &str,

0 commit comments

Comments
 (0)