Skip to content

Commit 49d2308

Browse files
author
Devdutt Shenoi
committed
refactor: get stream handle first and then metadata
1 parent 1ebc843 commit 49d2308

File tree

15 files changed

+247
-452
lines changed

15 files changed

+247
-452
lines changed

src/catalog/mod.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -280,14 +280,11 @@ async fn create_manifest(
280280
}
281281
};
282282
first_event_at = Some(lower_bound.with_timezone(&Local).to_rfc3339());
283-
if let Err(err) = PARSEABLE
284-
.streams
285-
.set_first_event_at(stream_name, first_event_at.as_ref().unwrap())
286-
{
287-
error!(
288-
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
289-
stream_name
290-
);
283+
match PARSEABLE.get_stream(stream_name) {
284+
Ok(stream) => stream.set_first_event_at(first_event_at.as_ref().unwrap()),
285+
Err(err) => error!(
286+
"Failed to update first_event_at in streaminfo for stream {stream_name:?}, error = {err:?}"
287+
),
291288
}
292289
}
293290
}
@@ -334,7 +331,7 @@ pub async fn remove_manifest_from_snapshot(
334331
let manifests = &mut meta.snapshot.manifest_list;
335332
// Filter out items whose manifest_path contains any of the dates_to_delete
336333
manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date)));
337-
PARSEABLE.streams.reset_first_event_at(stream_name)?;
334+
PARSEABLE.get_stream(stream_name)?.reset_first_event_at();
338335
meta.first_event_at = None;
339336
storage.put_snapshot(stream_name, meta.snapshot).await?;
340337
}
@@ -396,8 +393,8 @@ pub async fn get_first_event(
396393
meta.first_event_at = Some(first_event_at.clone());
397394
storage.put_stream_manifest(stream_name, &meta).await?;
398395
PARSEABLE
399-
.streams
400-
.set_first_event_at(stream_name, &first_event_at)?;
396+
.get_stream(stream_name)?
397+
.set_first_event_at(&first_event_at);
401398
}
402399
}
403400
}

src/connectors/kafka/processor.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,11 @@ impl ParseableSinkProcessor {
5555
.create_stream_if_not_exists(stream_name, StreamType::UserDefined, LogSource::Json)
5656
.await?;
5757

58-
let schema = PARSEABLE.streams.get_schema_raw(stream_name)?;
59-
let time_partition = PARSEABLE.streams.get_time_partition(stream_name)?;
60-
let static_schema_flag = PARSEABLE.streams.get_static_schema_flag(stream_name)?;
61-
let schema_version = PARSEABLE.streams.get_schema_version(stream_name)?;
58+
let stream = PARSEABLE.get_stream(stream_name)?;
59+
let schema = stream.get_schema_raw();
60+
let time_partition = stream.get_time_partition();
61+
let static_schema_flag = stream.get_static_schema_flag();
62+
let schema_version = stream.get_schema_version();
6263

6364
let (json_vec, total_payload_size) = Self::json_vec(records);
6465
let batch_json_event = json::Event {

src/handlers/airplane.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,9 @@ impl FlightService for AirServiceImpl {
112112
let table_name = table_name[0].clone();
113113

114114
let schema = PARSEABLE
115-
.streams
116-
.get_schema(&table_name)
117-
.map_err(|err| Status::failed_precondition(err.to_string()))?;
115+
.get_stream(&table_name)
116+
.map_err(|err| Status::failed_precondition(err.to_string()))?
117+
.get_schema();
118118

119119
let options = IpcWriteOptions::default();
120120
let schema_result = SchemaAsIpc::new(&schema, &options)

src/handlers/http/llm.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ pub async fn make_llm_request(body: web::Json<AiPrompt>) -> Result<HttpResponse,
9090
};
9191

9292
let stream_name = &body.stream;
93-
let schema = PARSEABLE.streams.get_schema(stream_name)?;
93+
let schema = PARSEABLE.get_stream(stream_name)?.get_schema();
9494
let filtered_schema = schema
9595
.flattened_fields()
9696
.into_iter()

src/handlers/http/logstream.rs

Lines changed: 68 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -128,21 +128,19 @@ pub async fn detect_schema(Json(json): Json<Value>) -> Result<impl Responder, St
128128
pub async fn schema(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
129129
let stream_name = stream_name.into_inner();
130130

131-
match PARSEABLE.streams.get_schema(&stream_name) {
132-
Ok(_) => {}
133-
Err(_) if PARSEABLE.options.mode == Mode::Query => {
134-
if !PARSEABLE
135-
.create_stream_and_schema_from_storage(&stream_name)
136-
.await?
137-
{
138-
return Err(StreamNotFound(stream_name.clone()).into());
139-
}
140-
}
141-
Err(err) => return Err(err.into()),
142-
};
131+
// Ensure parseable is aware of stream in distributed mode
132+
if PARSEABLE.options.mode == Mode::Query
133+
&& PARSEABLE
134+
.create_stream_and_schema_from_storage(&stream_name)
135+
.await?
136+
{
137+
return Err(StreamNotFound(stream_name.clone()).into());
138+
}
139+
140+
let stream = PARSEABLE.get_stream(&stream_name)?;
143141
match update_schema_when_distributed(&vec![stream_name.clone()]).await {
144142
Ok(_) => {
145-
let schema = PARSEABLE.streams.get_schema(&stream_name)?;
143+
let schema = stream.get_schema();
146144
Ok((web::Json(schema), StatusCode::OK))
147145
}
148146
Err(err) => Err(StreamError::Custom {
@@ -168,34 +166,24 @@ pub async fn put_stream(
168166

169167
pub async fn get_retention(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
170168
let stream_name = stream_name.into_inner();
171-
if !PARSEABLE.streams.contains(&stream_name) {
172-
// For query mode, if the stream not found in memory map,
173-
//check if it exists in the storage
174-
//create stream and schema from storage
175-
if PARSEABLE.options.mode == Mode::Query {
176-
match PARSEABLE
177-
.create_stream_and_schema_from_storage(&stream_name)
178-
.await
179-
{
180-
Ok(true) => {}
181-
Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()),
182-
}
183-
} else {
184-
return Err(StreamNotFound(stream_name).into());
185-
}
186-
}
187-
let retention = PARSEABLE.streams.get_retention(&stream_name);
188-
189-
match retention {
190-
Ok(retention) => {
191-
if let Some(retention) = retention {
192-
Ok((web::Json(retention), StatusCode::OK))
193-
} else {
194-
Ok((web::Json(Retention::default()), StatusCode::OK))
195-
}
169+
// For query mode, if the stream not found in memory map,
170+
//check if it exists in the storage
171+
//create stream and schema from storage
172+
if PARSEABLE.options.mode == Mode::Query {
173+
match PARSEABLE
174+
.create_stream_and_schema_from_storage(&stream_name)
175+
.await
176+
{
177+
Ok(true) => {}
178+
Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()),
196179
}
197-
Err(err) => Err(StreamError::from(err)),
198180
}
181+
182+
let retention = PARSEABLE
183+
.get_stream(&stream_name)?
184+
.get_retention()
185+
.unwrap_or_default();
186+
Ok((web::Json(retention), StatusCode::OK))
199187
}
200188

201189
pub async fn put_retention(
@@ -204,22 +192,19 @@ pub async fn put_retention(
204192
) -> Result<impl Responder, StreamError> {
205193
let stream_name = stream_name.into_inner();
206194

207-
if !PARSEABLE.streams.contains(&stream_name) {
208-
// For query mode, if the stream not found in memory map,
209-
//check if it exists in the storage
210-
//create stream and schema from storage
211-
if PARSEABLE.options.mode == Mode::Query {
212-
match PARSEABLE
213-
.create_stream_and_schema_from_storage(&stream_name)
214-
.await
215-
{
216-
Ok(true) => {}
217-
Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()),
218-
}
219-
} else {
220-
return Err(StreamNotFound(stream_name).into());
195+
// For query mode, if the stream not found in memory map,
196+
//check if it exists in the storage
197+
//create stream and schema from storage
198+
if PARSEABLE.options.mode == Mode::Query {
199+
match PARSEABLE
200+
.create_stream_and_schema_from_storage(&stream_name)
201+
.await
202+
{
203+
Ok(true) => {}
204+
Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()),
221205
}
222206
}
207+
let stream = PARSEABLE.get_stream(&stream_name)?;
223208

224209
let retention: Retention = match serde_json::from_value(json) {
225210
Ok(retention) => retention,
@@ -232,10 +217,7 @@ pub async fn put_retention(
232217
.put_retention(&stream_name, &retention)
233218
.await?;
234219

235-
PARSEABLE
236-
.streams
237-
.set_retention(&stream_name, retention)
238-
.expect("retention set on existing stream");
220+
stream.set_retention(retention);
239221

240222
Ok((
241223
format!("set retention configuration for log stream {stream_name}"),
@@ -437,28 +419,22 @@ pub async fn put_stream_hot_tier(
437419
Json(mut hottier): Json<StreamHotTier>,
438420
) -> Result<impl Responder, StreamError> {
439421
let stream_name = stream_name.into_inner();
440-
if !PARSEABLE.streams.contains(&stream_name) {
441-
// For query mode, if the stream not found in memory map,
442-
//check if it exists in the storage
443-
//create stream and schema from storage
444-
if PARSEABLE.options.mode == Mode::Query {
445-
match PARSEABLE
446-
.create_stream_and_schema_from_storage(&stream_name)
447-
.await
448-
{
449-
Ok(true) => {}
450-
Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()),
451-
}
452-
} else {
453-
return Err(StreamNotFound(stream_name).into());
422+
// For query mode, if the stream not found in memory map,
423+
//check if it exists in the storage
424+
//create stream and schema from storage
425+
if PARSEABLE.options.mode == Mode::Query {
426+
match PARSEABLE
427+
.create_stream_and_schema_from_storage(&stream_name)
428+
.await
429+
{
430+
Ok(true) => {}
431+
Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()),
454432
}
455433
}
456434

457-
if PARSEABLE
458-
.streams
459-
.get_stream_type(&stream_name)
460-
.is_ok_and(|t| t == StreamType::Internal)
461-
{
435+
let stream = PARSEABLE.get_stream(&stream_name)?;
436+
437+
if stream.get_stream_type() == StreamType::Internal {
462438
return Err(StreamError::Custom {
463439
msg: "Hot tier can not be updated for internal stream".to_string(),
464440
status: StatusCode::BAD_REQUEST,
@@ -467,7 +443,7 @@ pub async fn put_stream_hot_tier(
467443

468444
validator::hot_tier(&hottier.size.to_string())?;
469445

470-
PARSEABLE.streams.set_hot_tier(&stream_name, true)?;
446+
stream.set_hot_tier(true);
471447
let Some(hot_tier_manager) = HotTierManager::global() else {
472448
return Err(StreamError::HotTierNotEnabled(stream_name));
473449
};
@@ -526,38 +502,30 @@ pub async fn delete_stream_hot_tier(
526502
) -> Result<impl Responder, StreamError> {
527503
let stream_name = stream_name.into_inner();
528504

529-
if !PARSEABLE.streams.contains(&stream_name) {
530-
// For query mode, if the stream not found in memory map,
531-
//check if it exists in the storage
532-
//create stream and schema from storage
533-
if PARSEABLE.options.mode == Mode::Query {
534-
match PARSEABLE
535-
.create_stream_and_schema_from_storage(&stream_name)
536-
.await
537-
{
538-
Ok(true) => {}
539-
Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()),
540-
}
541-
} else {
542-
return Err(StreamNotFound(stream_name).into());
505+
// For query mode, if the stream not found in memory map,
506+
//check if it exists in the storage
507+
//create stream and schema from storage
508+
if PARSEABLE.options.mode == Mode::Query {
509+
match PARSEABLE
510+
.create_stream_and_schema_from_storage(&stream_name)
511+
.await
512+
{
513+
Ok(true) => {}
514+
Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()),
543515
}
544516
}
545517

546-
let Some(hot_tier_manager) = HotTierManager::global() else {
547-
return Err(StreamError::HotTierNotEnabled(stream_name));
548-
};
549-
550-
if PARSEABLE
551-
.streams
552-
.get_stream_type(&stream_name)
553-
.is_ok_and(|t| t == StreamType::Internal)
554-
{
518+
if PARSEABLE.get_stream(&stream_name)?.get_stream_type() == StreamType::Internal {
555519
return Err(StreamError::Custom {
556520
msg: "Hot tier can not be deleted for internal stream".to_string(),
557521
status: StatusCode::BAD_REQUEST,
558522
});
559523
}
560524

525+
let Some(hot_tier_manager) = HotTierManager::global() else {
526+
return Err(StreamError::HotTierNotEnabled(stream_name));
527+
};
528+
561529
hot_tier_manager.delete_hot_tier(&stream_name).await?;
562530

563531
Ok((

src/handlers/http/modal/query/querier_logstream.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,9 +168,8 @@ pub async fn get_stats(
168168
.ok_or_else(|| StreamNotFound(stream_name.clone()))?;
169169

170170
let ingestor_stats = if PARSEABLE
171-
.streams
172-
.get_stream_type(&stream_name)
173-
.is_ok_and(|t| t == StreamType::UserDefined)
171+
.get_stream(&stream_name)
172+
.is_ok_and(|stream| stream.get_stream_type() == StreamType::UserDefined)
174173
{
175174
Some(fetch_stats_from_ingestors(&stream_name).await?)
176175
} else {

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,14 @@ pub async fn push_logs(
6666
json: Value,
6767
log_source: &LogSource,
6868
) -> Result<(), PostError> {
69-
let time_partition = PARSEABLE.streams.get_time_partition(stream_name)?;
70-
let time_partition_limit = PARSEABLE.streams.get_time_partition_limit(stream_name)?;
71-
let static_schema_flag = PARSEABLE.streams.get_static_schema_flag(stream_name)?;
72-
let custom_partition = PARSEABLE.streams.get_custom_partition(stream_name)?;
73-
let schema_version = PARSEABLE.streams.get_schema_version(stream_name)?;
69+
let stream = PARSEABLE.get_stream(stream_name)?;
70+
let time_partition = stream.get_time_partition();
71+
let time_partition_limit = PARSEABLE
72+
.get_stream(stream_name)?
73+
.get_time_partition_limit();
74+
let static_schema_flag = stream.get_static_schema_flag();
75+
let custom_partition = stream.get_custom_partition();
76+
let schema_version = stream.get_schema_version();
7477

7578
let data = if time_partition.is_some() || custom_partition.is_some() {
7679
convert_array_to_object(

src/handlers/livetail.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,9 @@ impl FlightService for FlightServiceImpl {
116116
}
117117

118118
let schema = PARSEABLE
119-
.streams
120-
.get_schema(stream)
121-
.map_err(|err| Status::failed_precondition(err.to_string()))?;
119+
.get_stream(stream)
120+
.map_err(|err| Status::failed_precondition(err.to_string()))?
121+
.get_schema();
122122

123123
let rx = LIVETAIL.new_pipe(
124124
Alphanumeric.sample_string(&mut rand::thread_rng(), 32),

0 commit comments

Comments
 (0)