Skip to content

Commit a55e14b

Browse files
author
Devdutt Shenoi
committed
fix: load schema version, don't default
1 parent 2de9597 commit a55e14b

File tree

3 files changed

+7
-2
lines changed

3 files changed

+7
-2
lines changed

src/handlers/http/logstream.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,7 @@ pub async fn create_stream(
517517
static_schema_flag.to_string(),
518518
static_schema,
519519
stream_type,
520+
SchemaVersion::V1, // New stream
520521
);
521522
}
522523
Err(err) => {

src/handlers/http/modal/utils/logstream_utils.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use crate::{
2929
CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TIME_PARTITION_KEY,
3030
TIME_PARTITION_LIMIT_KEY, UPDATE_STREAM_KEY,
3131
},
32-
metadata::{self, STREAM_INFO},
32+
metadata::{self, SchemaVersion, STREAM_INFO},
3333
option::{Mode, CONFIG},
3434
static_schema::{convert_static_schema_to_arrow_schema, StaticSchema},
3535
storage::{LogStream, ObjectStoreFormat, StreamType},
@@ -426,6 +426,7 @@ pub async fn create_stream(
426426
static_schema_flag.to_string(),
427427
static_schema,
428428
stream_type,
429+
SchemaVersion::V1, // New stream
429430
);
430431
}
431432
Err(err) => {
@@ -474,6 +475,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<
474475
let custom_partition = stream_metadata.custom_partition.as_deref().unwrap_or("");
475476
let static_schema_flag = stream_metadata.static_schema_flag.as_deref().unwrap_or("");
476477
let stream_type = stream_metadata.stream_type.as_deref().unwrap_or("");
478+
let schema_version = stream_metadata.schema_version;
477479

478480
metadata::STREAM_INFO.add_stream(
479481
stream_name.to_string(),
@@ -484,6 +486,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<
484486
static_schema_flag.to_string(),
485487
static_schema,
486488
stream_type,
489+
schema_version,
487490
);
488491
} else {
489492
return Ok(false);

src/metadata.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,7 @@ impl StreamInfo {
274274
static_schema_flag: String,
275275
static_schema: HashMap<String, Arc<Field>>,
276276
stream_type: &str,
277+
schema_version: SchemaVersion,
277278
) {
278279
let mut map = self.write().expect(LOCK_EXPECT);
279280
let metadata = LogStreamMetadata {
@@ -304,7 +305,7 @@ impl StreamInfo {
304305
static_schema
305306
},
306307
stream_type: Some(stream_type.to_string()),
307-
schema_version: SchemaVersion::V1,
308+
schema_version,
308309
..Default::default()
309310
};
310311
map.insert(stream_name, metadata);

0 commit comments

Comments
 (0)