Skip to content

Commit 0bf7acc

Browse files
enhancement: add stream type to stream definition
1. add stream_type=UserDefined for user defined streams 2. add stream_type=Internal for pmeta 3. migrate existing streams' stream.json from v4 to v5 4. add field stream_type in migration for existing streams 5. add field stream_type in GET /logstream/{logstream}/info API
1 parent fef8f6d commit 0bf7acc

File tree

10 files changed

+131
-27
lines changed

10 files changed

+131
-27
lines changed

server/src/event.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use std::sync::Arc;
2727

2828
use self::error::EventError;
2929
pub use self::writer::STREAM_WRITERS;
30-
use crate::{handlers::http::ingest::PostError, metadata};
30+
use crate::{handlers::http::ingest::PostError, metadata, storage::StreamType};
3131
use chrono::NaiveDateTime;
3232
use std::collections::HashMap;
3333

@@ -45,6 +45,7 @@ pub struct Event {
4545
pub parsed_timestamp: NaiveDateTime,
4646
pub time_partition: Option<String>,
4747
pub custom_partition_values: HashMap<String, String>,
48+
pub stream_type: StreamType,
4849
}
4950

5051
// Events holds the schema related to a each event for a single log stream
@@ -75,6 +76,7 @@ impl Event {
7576
self.rb.clone(),
7677
self.parsed_timestamp,
7778
&self.custom_partition_values,
79+
&self.stream_type,
7880
)?;
7981

8082
metadata::STREAM_INFO.update_stats(
@@ -106,6 +108,7 @@ impl Event {
106108
self.rb.clone(),
107109
self.parsed_timestamp,
108110
&self.custom_partition_values,
111+
&self.stream_type,
109112
)
110113
.map_err(PostError::Event)
111114
}
@@ -122,13 +125,15 @@ impl Event {
122125
rb: RecordBatch,
123126
parsed_timestamp: NaiveDateTime,
124127
custom_partition_values: &HashMap<String, String>,
128+
stream_type: &StreamType,
125129
) -> Result<(), EventError> {
126130
STREAM_WRITERS.append_to_local(
127131
stream_name,
128132
schema_key,
129133
rb,
130134
parsed_timestamp,
131135
custom_partition_values.clone(),
136+
stream_type,
132137
)?;
133138
Ok(())
134139
}

server/src/event/writer.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ use std::{
2626
};
2727

2828
use crate::{
29-
handlers::http::cluster::INTERNAL_STREAM_NAME,
3029
option::{Mode, CONFIG},
30+
storage::StreamType,
3131
utils,
3232
};
3333

@@ -92,6 +92,7 @@ impl WriterTable {
9292
record: RecordBatch,
9393
parsed_timestamp: NaiveDateTime,
9494
custom_partition_values: HashMap<String, String>,
95+
stream_type: &StreamType,
9596
) -> Result<(), StreamWriterError> {
9697
let hashmap_guard = self.read().unwrap();
9798

@@ -104,6 +105,7 @@ impl WriterTable {
104105
record,
105106
parsed_timestamp,
106107
&custom_partition_values,
108+
stream_type,
107109
)?;
108110
}
109111
None => {
@@ -118,12 +120,14 @@ impl WriterTable {
118120
record,
119121
parsed_timestamp,
120122
&custom_partition_values,
123+
stream_type,
121124
)?;
122125
}
123126
};
124127
Ok(())
125128
}
126129

130+
#[allow(clippy::too_many_arguments)]
127131
fn handle_existing_writer(
128132
&self,
129133
stream_writer: &Mutex<Writer>,
@@ -132,8 +136,9 @@ impl WriterTable {
132136
record: RecordBatch,
133137
parsed_timestamp: NaiveDateTime,
134138
custom_partition_values: &HashMap<String, String>,
139+
stream_type: &StreamType,
135140
) -> Result<(), StreamWriterError> {
136-
if CONFIG.parseable.mode != Mode::Query || stream_name == INTERNAL_STREAM_NAME {
141+
if CONFIG.parseable.mode != Mode::Query || *stream_type == StreamType::Internal {
137142
stream_writer.lock().unwrap().push(
138143
stream_name,
139144
schema_key,
@@ -151,6 +156,7 @@ impl WriterTable {
151156
Ok(())
152157
}
153158

159+
#[allow(clippy::too_many_arguments)]
154160
fn handle_missing_writer(
155161
&self,
156162
mut map: RwLockWriteGuard<HashMap<String, Mutex<Writer>>>,
@@ -159,10 +165,11 @@ impl WriterTable {
159165
record: RecordBatch,
160166
parsed_timestamp: NaiveDateTime,
161167
custom_partition_values: &HashMap<String, String>,
168+
stream_type: &StreamType,
162169
) -> Result<(), StreamWriterError> {
163170
match map.get(stream_name) {
164171
Some(writer) => {
165-
if CONFIG.parseable.mode != Mode::Query || stream_name == INTERNAL_STREAM_NAME {
172+
if CONFIG.parseable.mode != Mode::Query || *stream_type == StreamType::Internal {
166173
writer.lock().unwrap().push(
167174
stream_name,
168175
schema_key,
@@ -175,7 +182,7 @@ impl WriterTable {
175182
}
176183
}
177184
None => {
178-
if CONFIG.parseable.mode != Mode::Query || stream_name == INTERNAL_STREAM_NAME {
185+
if CONFIG.parseable.mode != Mode::Query || *stream_type == StreamType::Internal {
179186
let mut writer = Writer::default();
180187
writer.push(
181188
stream_name,

server/src/handlers/http/ingest.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use crate::localcache::CacheError;
3434
use crate::metadata::error::stream_info::MetadataError;
3535
use crate::metadata::{self, STREAM_INFO};
3636
use crate::option::{Mode, CONFIG};
37-
use crate::storage::{LogStream, ObjectStorageError};
37+
use crate::storage::{LogStream, ObjectStorageError, StreamType};
3838
use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError};
3939
use crate::utils::json::convert_array_to_object;
4040
use actix_web::{http::header::ContentType, HttpRequest, HttpResponse};
@@ -57,13 +57,15 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
5757
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
5858
{
5959
let stream_name = stream_name.to_str().unwrap().to_owned();
60-
if stream_name.eq(INTERNAL_STREAM_NAME) {
60+
let internal_stream_names = STREAM_INFO.list_internal_streams();
61+
62+
if internal_stream_names.contains(&stream_name) || stream_name == INTERNAL_STREAM_NAME {
6163
return Err(PostError::Invalid(anyhow::anyhow!(
62-
"Stream {} is an internal stream and cannot be ingested into",
64+
"The stream {} is reserved for internal use and cannot be ingested into",
6365
stream_name
6466
)));
6567
}
66-
create_stream_if_not_exists(&stream_name, false).await?;
68+
create_stream_if_not_exists(&stream_name, StreamType::UserDefined).await?;
6769

6870
flatten_and_push_logs(req, body, stream_name).await?;
6971
Ok(HttpResponse::Ok().finish())
@@ -73,7 +75,7 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
7375
}
7476

7577
pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> {
76-
create_stream_if_not_exists(&stream_name, true).await?;
78+
create_stream_if_not_exists(&stream_name, StreamType::Internal).await?;
7779
let size: usize = body.len();
7880
let parsed_timestamp = Utc::now().naive_utc();
7981
let (rb, is_first) = {
@@ -100,6 +102,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
100102
parsed_timestamp,
101103
time_partition: None,
102104
custom_partition_values: HashMap::new(),
105+
stream_type: StreamType::Internal,
103106
}
104107
.process()
105108
.await?;
@@ -116,7 +119,7 @@ pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result<HttpRespo
116119
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
117120
{
118121
let stream_name = stream_name.to_str().unwrap().to_owned();
119-
create_stream_if_not_exists(&stream_name, false).await?;
122+
create_stream_if_not_exists(&stream_name, StreamType::UserDefined).await?;
120123

121124
//flatten logs
122125
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY)
@@ -176,7 +179,8 @@ async fn flatten_and_push_logs(
176179
// fails if the logstream does not exist
177180
pub async fn post_event(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostError> {
178181
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
179-
if stream_name.eq(INTERNAL_STREAM_NAME) {
182+
let internal_stream_names = STREAM_INFO.list_internal_streams();
183+
if internal_stream_names.contains(&stream_name) || stream_name == INTERNAL_STREAM_NAME {
180184
return Err(PostError::Invalid(anyhow::anyhow!(
181185
"Stream {} is an internal stream and cannot be ingested into",
182186
stream_name
@@ -202,6 +206,7 @@ pub async fn push_logs_unchecked(
202206
time_partition: None,
203207
is_first_event: true, // NOTE: Maybe should be false
204208
custom_partition_values: HashMap::new(), // should be an empty map for unchecked push
209+
stream_type: StreamType::UserDefined,
205210
};
206211
unchecked_event.process_unchecked()?;
207212

@@ -369,6 +374,7 @@ async fn create_process_record_batch(
369374
parsed_timestamp,
370375
time_partition: time_partition.clone(),
371376
custom_partition_values: custom_partition_values.clone(),
377+
stream_type: StreamType::UserDefined,
372378
}
373379
.process()
374380
.await?;
@@ -413,7 +419,7 @@ fn into_event_batch(
413419
// Check if the stream exists and create a new stream if doesn't exist
414420
pub async fn create_stream_if_not_exists(
415421
stream_name: &str,
416-
internal_stream: bool,
422+
stream_type: StreamType,
417423
) -> Result<(), PostError> {
418424
if STREAM_INFO.stream_exists(stream_name) {
419425
return Ok(());
@@ -427,7 +433,7 @@ pub async fn create_stream_if_not_exists(
427433
"",
428434
"",
429435
Arc::new(Schema::empty()),
430-
internal_stream,
436+
stream_type,
431437
)
432438
.await?;
433439
}

server/src/handlers/http/logstream.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use crate::option::validation::bytes_to_human_size;
3434
use crate::option::{Mode, CONFIG};
3535
use crate::static_schema::{convert_static_schema_to_arrow_schema, StaticSchema};
3636
use crate::stats::{event_labels_date, storage_size_labels_date, Stats};
37+
use crate::storage::StreamType;
3738
use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo};
3839
use crate::{
3940
catalog::{self, remove_manifest_from_snapshot},
@@ -377,7 +378,7 @@ async fn create_update_stream(
377378
&custom_partition,
378379
&static_schema_flag,
379380
schema,
380-
false,
381+
StreamType::UserDefined,
381382
)
382383
.await?;
383384

@@ -835,11 +836,11 @@ pub async fn create_stream(
835836
custom_partition: &str,
836837
static_schema_flag: &str,
837838
schema: Arc<Schema>,
838-
internal_stream: bool,
839+
stream_type: StreamType,
839840
) -> Result<(), CreateStreamError> {
840841
// fail to proceed if invalid stream name
841-
if !internal_stream {
842-
validator::stream_name(&stream_name)?;
842+
if stream_type != StreamType::Internal {
843+
validator::stream_name(&stream_name, &stream_type)?;
843844
}
844845
// Proceed to create log stream if it doesn't exist
845846
let storage = CONFIG.storage().get_object_store();
@@ -852,6 +853,7 @@ pub async fn create_stream(
852853
custom_partition,
853854
static_schema_flag,
854855
schema.clone(),
856+
stream_type,
855857
)
856858
.await
857859
{
@@ -908,6 +910,7 @@ pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamE
908910
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;
909911

910912
let stream_info: StreamInfo = StreamInfo {
913+
stream_type: stream_meta.stream_type.clone(),
911914
created_at: stream_meta.created_at.clone(),
912915
first_event_at: stream_meta.first_event_at.clone(),
913916
time_partition: stream_meta.time_partition.clone(),

server/src/metadata.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use crate::metrics::{
3636
use crate::option::{Mode, CONFIG};
3737
use crate::storage::retention::Retention;
3838
use crate::storage::{
39-
LogStream, ObjectStorage, ObjectStoreFormat, StorageDir, STREAM_METADATA_FILE_NAME,
39+
LogStream, ObjectStorage, ObjectStoreFormat, StorageDir, StreamType, STREAM_METADATA_FILE_NAME,
4040
STREAM_ROOT_DIRECTORY,
4141
};
4242
use crate::utils::arrow::MergedRecordReader;
@@ -62,6 +62,7 @@ pub struct LogStreamMetadata {
6262
pub custom_partition: Option<String>,
6363
pub static_schema_flag: Option<String>,
6464
pub hot_tier_enabled: Option<bool>,
65+
pub stream_type: StreamType,
6566
}
6667

6768
// It is very unlikely that panic will occur when dealing with metadata.
@@ -341,6 +342,7 @@ impl StreamInfo {
341342
custom_partition: meta.custom_partition,
342343
static_schema_flag: meta.static_schema_flag,
343344
hot_tier_enabled: meta.hot_tier_enabled,
345+
stream_type: meta.stream_type,
344346
};
345347

346348
let mut map = self.write().expect(LOCK_EXPECT);
@@ -357,6 +359,15 @@ impl StreamInfo {
357359
.collect()
358360
}
359361

362+
pub fn list_internal_streams(&self) -> Vec<String> {
363+
self.read()
364+
.expect(LOCK_EXPECT)
365+
.iter()
366+
.filter(|(_, v)| v.stream_type != StreamType::UserDefined)
367+
.map(|(k, _)| k.clone())
368+
.collect()
369+
}
370+
360371
pub fn update_stats(
361372
&self,
362373
stream_name: &str,
@@ -471,6 +482,7 @@ pub async fn load_stream_metadata_on_server_start(
471482
custom_partition,
472483
static_schema_flag: meta.static_schema_flag.clone(),
473484
hot_tier_enabled: meta.hot_tier_enabled,
485+
stream_type: meta.stream_type,
474486
};
475487

476488
let mut map = STREAM_INFO.write().expect(LOCK_EXPECT);

server/src/migration.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,8 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::
144144
match version {
145145
Some("v1") => {
146146
stream_metadata_value = stream_metadata_migration::v1_v4(stream_metadata_value);
147+
stream_metadata_value =
148+
stream_metadata_migration::v4_v5(stream_metadata_value, stream);
147149
storage
148150
.put_object(&path, to_bytes(&stream_metadata_value))
149151
.await?;
@@ -155,6 +157,8 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::
155157
}
156158
Some("v2") => {
157159
stream_metadata_value = stream_metadata_migration::v2_v4(stream_metadata_value);
160+
stream_metadata_value =
161+
stream_metadata_migration::v4_v5(stream_metadata_value, stream);
158162
storage
159163
.put_object(&path, to_bytes(&stream_metadata_value))
160164
.await?;
@@ -167,6 +171,15 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::
167171
}
168172
Some("v3") => {
169173
stream_metadata_value = stream_metadata_migration::v3_v4(stream_metadata_value);
174+
stream_metadata_value =
175+
stream_metadata_migration::v4_v5(stream_metadata_value, stream);
176+
storage
177+
.put_object(&path, to_bytes(&stream_metadata_value))
178+
.await?;
179+
}
180+
Some("v4") => {
181+
stream_metadata_value =
182+
stream_metadata_migration::v4_v5(stream_metadata_value, stream);
170183
storage
171184
.put_object(&path, to_bytes(&stream_metadata_value))
172185
.await?;

0 commit comments

Comments
 (0)