Skip to content

Commit 6de55c1

Browse files
committed
update logic of API endpoints after rebase on main
1 parent 807b5b7 commit 6de55c1

File tree

5 files changed

+60
-83
lines changed

5 files changed

+60
-83
lines changed

server/src/handlers/http/cluster/mod.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,15 @@ use crate::handlers::http::cluster::utils::{
2323
};
2424
use crate::handlers::http::ingest::PostError;
2525
use crate::handlers::http::logstream::error::StreamError;
26+
use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY};
2627
use crate::option::CONFIG;
2728

2829
use crate::metrics::prom_utils::Metrics;
2930
use crate::storage::ObjectStorageError;
3031
use crate::storage::PARSEABLE_ROOT_DIRECTORY;
3132
use actix_web::http::header;
3233
use actix_web::{HttpRequest, Responder};
34+
use bytes::Bytes;
3335
use http::StatusCode;
3436
use itertools::Itertools;
3537
use relative_path::RelativePathBuf;
@@ -43,7 +45,12 @@ use super::base_path_without_preceding_slash;
4345
use super::modal::IngesterMetadata;
4446

4547
// forward the request to all ingesters to keep them in sync
46-
pub async fn sync_streams_with_ingesters(stream_name: &str) -> Result<(), StreamError> {
48+
pub async fn sync_streams_with_ingesters(
49+
stream_name: &str,
50+
time_partition: &str,
51+
static_schema: &str,
52+
schema: Bytes,
53+
) -> Result<(), StreamError> {
4754
let ingester_infos = get_ingester_info().await.map_err(|err| {
4855
log::error!("Fatal: failed to get ingester info: {:?}", err);
4956
StreamError::Anyhow(err)
@@ -58,7 +65,7 @@ pub async fn sync_streams_with_ingesters(stream_name: &str) -> Result<(), Stream
5865
stream_name
5966
);
6067

61-
match send_stream_sync_request(&url, ingester.clone()).await {
68+
match send_stream_sync_request(&url, ingester.clone(), time_partition, static_schema, schema.clone()).await {
6269
Ok(_) => continue,
6370
Err(_) => {
6471
errored = true;
@@ -144,6 +151,9 @@ pub async fn fetch_stats_from_ingesters(
144151
async fn send_stream_sync_request(
145152
url: &str,
146153
ingester: IngesterMetadata,
154+
time_partition: &str,
155+
static_schema: &str,
156+
schema: Bytes,
147157
) -> Result<(), StreamError> {
148158
if !utils::check_liveness(&ingester.domain_name).await {
149159
return Ok(());
@@ -153,7 +163,10 @@ async fn send_stream_sync_request(
153163
let res = client
154164
.put(url)
155165
.header(header::CONTENT_TYPE, "application/json")
166+
.header(TIME_PARTITION_KEY, time_partition)
167+
.header(STATIC_SCHEMA_FLAG, static_schema)
156168
.header(header::AUTHORIZATION, ingester.token)
169+
.body(schema)
157170
.send()
158171
.await
159172
.map_err(|err| {

server/src/handlers/http/cluster/utils.rs

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@ use url::Url;
2828
#[derive(Debug, Default, Serialize, Deserialize)]
2929
pub struct QueriedStats {
3030
pub stream: String,
31-
pub creation_time: String,
32-
pub first_event_at: Option<String>,
3331
pub time: DateTime<Utc>,
3432
pub ingestion: IngestionStats,
3533
pub storage: StorageStats,
@@ -38,16 +36,12 @@ pub struct QueriedStats {
3836
impl QueriedStats {
3937
pub fn new(
4038
stream: &str,
41-
creation_time: &str,
42-
first_event_at: Option<String>,
4339
time: DateTime<Utc>,
4440
ingestion: IngestionStats,
4541
storage: StorageStats,
4642
) -> Self {
4743
Self {
4844
stream: stream.to_string(),
49-
creation_time: creation_time.to_string(),
50-
first_event_at,
5145
time,
5246
ingestion,
5347
storage,
@@ -119,26 +113,26 @@ impl StorageStats {
119113

120114
pub fn merge_quried_stats(stats: Vec<QueriedStats>) -> QueriedStats {
121115
// get the actual creation time
122-
let min_creation_time = stats
123-
.iter()
124-
.map(|x| x.creation_time.parse::<DateTime<Utc>>().unwrap())
125-
.min()
126-
.unwrap(); // should never be None
116+
// let min_creation_time = stats
117+
// .iter()
118+
// .map(|x| x.creation_time.parse::<DateTime<Utc>>().unwrap())
119+
// .min()
120+
// .unwrap(); // should never be None
127121

128122
// get the stream name
129123
let stream_name = stats[0].stream.clone();
130124

131125
// get the first event at
132-
let min_first_event_at = stats
133-
.iter()
134-
.map(|x| match x.first_event_at.as_ref() {
126+
// let min_first_event_at = stats
127+
// .iter()
128+
// .map(|x| match x.first_event_at.as_ref() {
135129
// we can directly unwrap here because
136130
// we are sure that the first_event_at is a valid date
137-
Some(fea) => fea.parse::<DateTime<Utc>>().unwrap(),
138-
None => Utc::now(), // current time ie the max time
139-
})
140-
.min()
141-
.unwrap(); // should never be None
131+
// Some(fea) => fea.parse::<DateTime<Utc>>().unwrap(),
132+
// None => Utc::now(), // current time ie the max time
133+
// })
134+
// .min()
135+
// .unwrap(); // should never be None
142136

143137
let min_time = stats.iter().map(|x| x.time).min().unwrap_or_else(Utc::now);
144138

@@ -179,8 +173,6 @@ pub fn merge_quried_stats(stats: Vec<QueriedStats>) -> QueriedStats {
179173

180174
QueriedStats::new(
181175
&stream_name,
182-
&min_creation_time.to_string(),
183-
Some(min_first_event_at.to_string()),
184176
min_time,
185177
cumulative_ingestion,
186178
cumulative_storage,

server/src/handlers/http/ingest.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,13 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr
156156
}
157157
match &CONFIG.parseable.mode {
158158
Mode::All | Mode::Query => {
159-
super::logstream::create_stream(stream_name.to_string(), "", "", Arc::new(Schema::empty()))
160-
.await?;
159+
super::logstream::create_stream(
160+
stream_name.to_string(),
161+
"",
162+
"",
163+
Arc::new(Schema::empty()),
164+
)
165+
.await?;
161166
}
162167
Mode::Ingest => {
163168
return Err(PostError::Invalid(anyhow::anyhow!(

server/src/handlers/http/logstream.rs

Lines changed: 18 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -146,24 +146,18 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
146146
});
147147
}
148148

149-
150-
if !body.is_empty() {
151-
if static_schema_flag == "true" {
152-
let body_str = std::str::from_utf8(&body).unwrap();
153-
let static_schema: StaticSchema = serde_json::from_str(body_str).unwrap();
154-
let parsed_schema = convert_static_schema_to_arrow_schema(static_schema);
155-
if let Ok(parsed_schema) = parsed_schema {
156-
schema = parsed_schema;
157-
} else {
158-
return Err(StreamError::Custom {
159-
msg: format!(
160-
"unable to commit static schema, logstream {stream_name} not created"
161-
),
162-
status: StatusCode::BAD_REQUEST,
163-
});
164-
}
149+
if !body.is_empty() && static_schema_flag == "true" {
150+
let static_schema: StaticSchema = serde_json::from_slice(&body).unwrap();
151+
let parsed_schema = convert_static_schema_to_arrow_schema(static_schema);
152+
if let Ok(parsed_schema) = parsed_schema {
153+
schema = parsed_schema;
154+
} else {
155+
return Err(StreamError::Custom {
156+
msg: format!("unable to commit static schema, logstream {stream_name} not created"),
157+
status: StatusCode::BAD_REQUEST,
158+
});
165159
}
166-
} else if static_schema_flag == "true" {
160+
} else if body.is_empty() && static_schema_flag == "true" {
167161
return Err(StreamError::Custom {
168162
msg: format!(
169163
"please provide schema in the request body for static schema logstream {stream_name}"
@@ -172,9 +166,9 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
172166
});
173167
}
174168

175-
// ! broken
169+
176170
if CONFIG.parseable.mode == Mode::Query {
177-
sync_streams_with_ingesters(&stream_name).await?;
171+
sync_streams_with_ingesters(&stream_name, time_partition, static_schema_flag, body).await?;
178172
}
179173
create_stream(stream_name, time_partition, static_schema_flag, schema).await?;
180174

@@ -333,23 +327,8 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
333327

334328
let time = Utc::now();
335329

336-
/* // ! broken / update
337-
let stats = serde_json::json!({
338-
"stream": stream_name,
339-
"time": time,
340-
"ingestion": {
341-
"count": stats.events,
342-
"size": format!("{} {}", stats.ingestion, "Bytes"),
343-
"format": "json"
344-
},
345-
"storage": {
346-
"size": format!("{} {}", stats.storage, "Bytes"),
347-
"format": "parquet"
348-
}
349-
});
350-
*/
351330
let stats = match &stream_meta.first_event_at {
352-
Some(first_event_at) => {
331+
Some(_) => {
353332
let ingestion_stats = IngestionStats::new(
354333
stats.events,
355334
format!("{} {}", stats.ingestion, "Bytes"),
@@ -358,14 +337,7 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
358337
let storage_stats =
359338
StorageStats::new(format!("{} {}", stats.storage, "Bytes"), "parquet");
360339

361-
QueriedStats::new(
362-
&stream_name,
363-
&stream_meta.created_at,
364-
Some(first_event_at.to_owned()),
365-
time,
366-
ingestion_stats,
367-
storage_stats,
368-
)
340+
QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats)
369341
}
370342

371343
None => {
@@ -377,14 +349,7 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
377349
let storage_stats =
378350
StorageStats::new(format!("{} {}", stats.storage, "Bytes"), "parquet");
379351

380-
QueriedStats::new(
381-
&stream_name,
382-
&stream_meta.created_at,
383-
None,
384-
time,
385-
ingestion_stats,
386-
storage_stats,
387-
)
352+
QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats)
388353
}
389354
};
390355
let stats = if let Some(mut ingester_stats) = ingester_stats {
@@ -505,6 +470,8 @@ pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamE
505470
static_schema_flag: stream_meta.static_schema_flag.clone(),
506471
};
507472

473+
// get the other info from
474+
508475
Ok((web::Json(stream_info), StatusCode::OK))
509476
}
510477

server/src/storage/staging.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -170,13 +170,13 @@ impl StorageDir {
170170
.port();
171171
let filename = filename.rsplit_once('.').unwrap();
172172
let filename = format!("{}.{}.{}", filename.0, port, filename.1);
173-
/*
174-
let file_stem = path.file_stem().unwrap().to_str().unwrap();
175-
let random_string =
176-
rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 20);
177-
let (_, filename) = file_stem.split_once('.').unwrap();
178-
let filename_with_random_number = format!("{}.{}.{}", filename, random_number, "arrows");
179-
*/
173+
/*
174+
let file_stem = path.file_stem().unwrap().to_str().unwrap();
175+
let random_string =
176+
rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 20);
177+
let (_, filename) = file_stem.split_once('.').unwrap();
178+
let filename_with_random_number = format!("{}.{}.{}", filename, random_number, "arrows");
179+
*/
180180

181181
let mut parquet_path = path.to_owned();
182182
parquet_path.set_file_name(filename);

0 commit comments

Comments
 (0)