Skip to content

Commit 06c0009

Browse files
author
Devdutt Shenoi
committed
associate create_internal_stream_if_not_exists with Parseable
1 parent 12a9ad5 commit 06c0009

File tree

4 files changed

+38
-37
lines changed

4 files changed

+38
-37
lines changed

src/handlers/http/logstream.rs

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,8 @@
1818

1919
use self::error::StreamError;
2020
use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats};
21-
use super::cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME};
2221
use super::query::update_schema_when_distributed;
23-
use crate::event::format::{override_data_type, LogSource};
24-
use crate::handlers::STREAM_TYPE_KEY;
22+
use crate::event::format::override_data_type;
2523
use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
2624
use crate::metadata::SchemaVersion;
2725
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
@@ -35,18 +33,15 @@ use crate::storage::{StreamInfo, StreamType};
3533
use crate::utils::actix::extract_session_key_from_req;
3634
use crate::{stats, validator, LOCK_EXPECT};
3735

38-
use actix_web::http::header::{self, HeaderMap};
3936
use actix_web::http::StatusCode;
4037
use actix_web::web::{Json, Path};
4138
use actix_web::{web, HttpRequest, Responder};
4239
use arrow_json::reader::infer_json_schema_from_iterator;
4340
use bytes::Bytes;
4441
use chrono::Utc;
45-
use http::{HeaderName, HeaderValue};
4642
use itertools::Itertools;
4743
use serde_json::{json, Value};
4844
use std::fs;
49-
use std::str::FromStr;
5045
use std::sync::Arc;
5146
use tracing::warn;
5247

@@ -534,27 +529,6 @@ pub async fn delete_stream_hot_tier(
534529
))
535530
}
536531

537-
pub async fn create_internal_stream_if_not_exists() -> Result<(), StreamError> {
538-
if let Ok(stream_exists) = PARSEABLE
539-
.create_stream_if_not_exists(INTERNAL_STREAM_NAME, StreamType::Internal, LogSource::Pmeta)
540-
.await
541-
{
542-
if stream_exists {
543-
return Ok(());
544-
}
545-
let mut header_map = HeaderMap::new();
546-
header_map.insert(
547-
HeaderName::from_str(STREAM_TYPE_KEY).unwrap(),
548-
HeaderValue::from_str(&StreamType::Internal.to_string()).unwrap(),
549-
);
550-
header_map.insert(
551-
header::CONTENT_TYPE,
552-
HeaderValue::from_static("application/json"),
553-
);
554-
sync_streams_with_ingestors(header_map, Bytes::new(), INTERNAL_STREAM_NAME).await?;
555-
}
556-
Ok(())
557-
}
558532
#[allow(unused)]
559533
fn classify_json_error(kind: serde_json::error::Category) -> StatusCode {
560534
match kind {

src/handlers/http/modal/query_server.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use crate::correlation::CORRELATIONS;
2121
use crate::handlers::airplane;
2222
use crate::handlers::http::base_path;
2323
use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular};
24-
use crate::handlers::http::logstream::create_internal_stream_if_not_exists;
2524
use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt};
2625
use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE};
2726
use crate::handlers::http::{rbac, role};
@@ -100,7 +99,7 @@ impl ParseableServer for QueryServer {
10099
migration::run_migration(&PARSEABLE).await?;
101100

102101
//create internal stream at server start
103-
create_internal_stream_if_not_exists().await?;
102+
PARSEABLE.create_internal_stream_if_not_exists().await?;
104103

105104
if let Err(e) = CORRELATIONS.load().await {
106105
error!("{e}");

src/handlers/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ const CUSTOM_PARTITION_KEY: &str = "x-p-custom-partition";
2828
const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag";
2929
const AUTHORIZATION_KEY: &str = "authorization";
3030
const UPDATE_STREAM_KEY: &str = "x-p-update-stream";
31-
const STREAM_TYPE_KEY: &str = "x-p-stream-type";
31+
pub const STREAM_TYPE_KEY: &str = "x-p-stream-type";
3232
const OIDC_SCOPE: &str = "openid profile email";
3333
const COOKIE_AGE_DAYS: usize = 7;
3434
const SESSION_COOKIE_NAME: &str = "session";

src/parseable/mod.rs

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,17 @@
1717
*
1818
*/
1919

20-
use std::{collections::HashMap, num::NonZeroU32, path::PathBuf, sync::Arc};
20+
use std::{collections::HashMap, num::NonZeroU32, path::PathBuf, str::FromStr, sync::Arc};
2121

2222
use actix_web::http::header::HeaderMap;
2323
use arrow_schema::{Field, Schema};
2424
use bytes::Bytes;
2525
use chrono::Local;
2626
use clap::{error::ErrorKind, Parser};
27-
use http::StatusCode;
27+
use http::{header::CONTENT_TYPE, HeaderName, HeaderValue, StatusCode};
2828
use once_cell::sync::Lazy;
2929
pub use staging::StagingError;
30-
use streams::{Stream, StreamRef};
30+
use streams::StreamRef;
3131
pub use streams::{StreamNotFound, Streams};
3232
use tracing::error;
3333

@@ -36,10 +36,14 @@ use crate::connectors::kafka::config::KafkaConfig;
3636
use crate::{
3737
cli::{Cli, Options, StorageOptions},
3838
event::format::LogSource,
39-
handlers::http::{
40-
ingest::PostError,
41-
logstream::error::{CreateStreamError, StreamError},
42-
modal::{utils::logstream_utils::PutStreamHeaders, IngestorMetadata},
39+
handlers::{
40+
http::{
41+
cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME},
42+
ingest::PostError,
43+
logstream::error::{CreateStreamError, StreamError},
44+
modal::{utils::logstream_utils::PutStreamHeaders, IngestorMetadata},
45+
},
46+
STREAM_TYPE_KEY,
4347
},
4448
metadata::{LogStreamMetadata, SchemaVersion},
4549
option::Mode,
@@ -341,6 +345,30 @@ impl Parseable {
341345
Ok(true)
342346
}
343347

348+
pub async fn create_internal_stream_if_not_exists(&self) -> Result<(), StreamError> {
349+
match self
350+
.create_stream_if_not_exists(
351+
INTERNAL_STREAM_NAME,
352+
StreamType::Internal,
353+
LogSource::Pmeta,
354+
)
355+
.await
356+
{
357+
Err(_) | Ok(true) => return Ok(()),
358+
_ => {}
359+
}
360+
361+
let mut header_map = HeaderMap::new();
362+
header_map.insert(
363+
HeaderName::from_str(STREAM_TYPE_KEY).unwrap(),
364+
HeaderValue::from_str(&StreamType::Internal.to_string()).unwrap(),
365+
);
366+
header_map.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
367+
sync_streams_with_ingestors(header_map, Bytes::new(), INTERNAL_STREAM_NAME).await?;
368+
369+
Ok(())
370+
}
371+
344372
// Check if the stream exists and create a new stream if doesn't exist
345373
pub async fn create_stream_if_not_exists(
346374
&self,

0 commit comments

Comments
 (0)