Skip to content

Commit c37fd58

Browse files
committed
update logic of API endpoints after rebase on main
1 parent 807b5b7 commit c37fd58

File tree

5 files changed

+69
-85
lines changed

5 files changed

+69
-85
lines changed

server/src/handlers/http/cluster/mod.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,15 @@ use crate::handlers::http::cluster::utils::{
2323
};
2424
use crate::handlers::http::ingest::PostError;
2525
use crate::handlers::http::logstream::error::StreamError;
26+
use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY};
2627
use crate::option::CONFIG;
2728

2829
use crate::metrics::prom_utils::Metrics;
2930
use crate::storage::ObjectStorageError;
3031
use crate::storage::PARSEABLE_ROOT_DIRECTORY;
3132
use actix_web::http::header;
3233
use actix_web::{HttpRequest, Responder};
34+
use bytes::Bytes;
3335
use http::StatusCode;
3436
use itertools::Itertools;
3537
use relative_path::RelativePathBuf;
@@ -43,7 +45,12 @@ use super::base_path_without_preceding_slash;
4345
use super::modal::IngesterMetadata;
4446

4547
// forward the request to all ingesters to keep them in sync
46-
pub async fn sync_streams_with_ingesters(stream_name: &str) -> Result<(), StreamError> {
48+
pub async fn sync_streams_with_ingesters(
49+
stream_name: &str,
50+
time_partition: &str,
51+
static_schema: &str,
52+
schema: Bytes,
53+
) -> Result<(), StreamError> {
4754
let ingester_infos = get_ingester_info().await.map_err(|err| {
4855
log::error!("Fatal: failed to get ingester info: {:?}", err);
4956
StreamError::Anyhow(err)
@@ -58,7 +65,15 @@ pub async fn sync_streams_with_ingesters(stream_name: &str) -> Result<(), Stream
5865
stream_name
5966
);
6067

61-
match send_stream_sync_request(&url, ingester.clone()).await {
68+
match send_stream_sync_request(
69+
&url,
70+
ingester.clone(),
71+
time_partition,
72+
static_schema,
73+
schema.clone(),
74+
)
75+
.await
76+
{
6277
Ok(_) => continue,
6378
Err(_) => {
6479
errored = true;
@@ -144,6 +159,9 @@ pub async fn fetch_stats_from_ingesters(
144159
async fn send_stream_sync_request(
145160
url: &str,
146161
ingester: IngesterMetadata,
162+
time_partition: &str,
163+
static_schema: &str,
164+
schema: Bytes,
147165
) -> Result<(), StreamError> {
148166
if !utils::check_liveness(&ingester.domain_name).await {
149167
return Ok(());
@@ -153,7 +171,10 @@ async fn send_stream_sync_request(
153171
let res = client
154172
.put(url)
155173
.header(header::CONTENT_TYPE, "application/json")
174+
.header(TIME_PARTITION_KEY, time_partition)
175+
.header(STATIC_SCHEMA_FLAG, static_schema)
156176
.header(header::AUTHORIZATION, ingester.token)
177+
.body(schema)
157178
.send()
158179
.await
159180
.map_err(|err| {

server/src/handlers/http/cluster/utils.rs

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@ use url::Url;
2828
#[derive(Debug, Default, Serialize, Deserialize)]
2929
pub struct QueriedStats {
3030
pub stream: String,
31-
pub creation_time: String,
32-
pub first_event_at: Option<String>,
3331
pub time: DateTime<Utc>,
3432
pub ingestion: IngestionStats,
3533
pub storage: StorageStats,
@@ -38,16 +36,12 @@ pub struct QueriedStats {
3836
impl QueriedStats {
3937
pub fn new(
4038
stream: &str,
41-
creation_time: &str,
42-
first_event_at: Option<String>,
4339
time: DateTime<Utc>,
4440
ingestion: IngestionStats,
4541
storage: StorageStats,
4642
) -> Self {
4743
Self {
4844
stream: stream.to_string(),
49-
creation_time: creation_time.to_string(),
50-
first_event_at,
5145
time,
5246
ingestion,
5347
storage,
@@ -119,26 +113,26 @@ impl StorageStats {
119113

120114
pub fn merge_quried_stats(stats: Vec<QueriedStats>) -> QueriedStats {
121115
// get the actual creation time
122-
let min_creation_time = stats
123-
.iter()
124-
.map(|x| x.creation_time.parse::<DateTime<Utc>>().unwrap())
125-
.min()
126-
.unwrap(); // should never be None
116+
// let min_creation_time = stats
117+
// .iter()
118+
// .map(|x| x.creation_time.parse::<DateTime<Utc>>().unwrap())
119+
// .min()
120+
// .unwrap(); // should never be None
127121

128122
// get the stream name
129123
let stream_name = stats[0].stream.clone();
130124

131125
// get the first event at
132-
let min_first_event_at = stats
133-
.iter()
134-
.map(|x| match x.first_event_at.as_ref() {
135-
// we can directly unwrap here because
136-
// we are sure that the first_event_at is a valid date
137-
Some(fea) => fea.parse::<DateTime<Utc>>().unwrap(),
138-
None => Utc::now(), // current time ie the max time
139-
})
140-
.min()
141-
.unwrap(); // should never be None
126+
// let min_first_event_at = stats
127+
// .iter()
128+
// .map(|x| match x.first_event_at.as_ref() {
129+
// we can directly unwrap here because
130+
// we are sure that the first_event_at is a valid date
131+
// Some(fea) => fea.parse::<DateTime<Utc>>().unwrap(),
132+
// None => Utc::now(), // current time ie the max time
133+
// })
134+
// .min()
135+
// .unwrap(); // should never be None
142136

143137
let min_time = stats.iter().map(|x| x.time).min().unwrap_or_else(Utc::now);
144138

@@ -179,8 +173,6 @@ pub fn merge_quried_stats(stats: Vec<QueriedStats>) -> QueriedStats {
179173

180174
QueriedStats::new(
181175
&stream_name,
182-
&min_creation_time.to_string(),
183-
Some(min_first_event_at.to_string()),
184176
min_time,
185177
cumulative_ingestion,
186178
cumulative_storage,

server/src/handlers/http/ingest.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,13 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr
156156
}
157157
match &CONFIG.parseable.mode {
158158
Mode::All | Mode::Query => {
159-
super::logstream::create_stream(stream_name.to_string(), "", "", Arc::new(Schema::empty()))
160-
.await?;
159+
super::logstream::create_stream(
160+
stream_name.to_string(),
161+
"",
162+
"",
163+
Arc::new(Schema::empty()),
164+
)
165+
.await?;
161166
}
162167
Mode::Ingest => {
163168
return Err(PostError::Invalid(anyhow::anyhow!(

server/src/handlers/http/logstream.rs

Lines changed: 17 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -146,24 +146,18 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
146146
});
147147
}
148148

149-
150-
if !body.is_empty() {
151-
if static_schema_flag == "true" {
152-
let body_str = std::str::from_utf8(&body).unwrap();
153-
let static_schema: StaticSchema = serde_json::from_str(body_str).unwrap();
154-
let parsed_schema = convert_static_schema_to_arrow_schema(static_schema);
155-
if let Ok(parsed_schema) = parsed_schema {
156-
schema = parsed_schema;
157-
} else {
158-
return Err(StreamError::Custom {
159-
msg: format!(
160-
"unable to commit static schema, logstream {stream_name} not created"
161-
),
162-
status: StatusCode::BAD_REQUEST,
163-
});
164-
}
149+
if !body.is_empty() && static_schema_flag == "true" {
150+
let static_schema: StaticSchema = serde_json::from_slice(&body).unwrap();
151+
let parsed_schema = convert_static_schema_to_arrow_schema(static_schema);
152+
if let Ok(parsed_schema) = parsed_schema {
153+
schema = parsed_schema;
154+
} else {
155+
return Err(StreamError::Custom {
156+
msg: format!("unable to commit static schema, logstream {stream_name} not created"),
157+
status: StatusCode::BAD_REQUEST,
158+
});
165159
}
166-
} else if static_schema_flag == "true" {
160+
} else if body.is_empty() && static_schema_flag == "true" {
167161
return Err(StreamError::Custom {
168162
msg: format!(
169163
"please provide schema in the request body for static schema logstream {stream_name}"
@@ -172,9 +166,8 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
172166
});
173167
}
174168

175-
// ! broken
176169
if CONFIG.parseable.mode == Mode::Query {
177-
sync_streams_with_ingesters(&stream_name).await?;
170+
sync_streams_with_ingesters(&stream_name, time_partition, static_schema_flag, body).await?;
178171
}
179172
create_stream(stream_name, time_partition, static_schema_flag, schema).await?;
180173

@@ -333,23 +326,8 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
333326

334327
let time = Utc::now();
335328

336-
/* // ! broken / update
337-
let stats = serde_json::json!({
338-
"stream": stream_name,
339-
"time": time,
340-
"ingestion": {
341-
"count": stats.events,
342-
"size": format!("{} {}", stats.ingestion, "Bytes"),
343-
"format": "json"
344-
},
345-
"storage": {
346-
"size": format!("{} {}", stats.storage, "Bytes"),
347-
"format": "parquet"
348-
}
349-
});
350-
*/
351329
let stats = match &stream_meta.first_event_at {
352-
Some(first_event_at) => {
330+
Some(_) => {
353331
let ingestion_stats = IngestionStats::new(
354332
stats.events,
355333
format!("{} {}", stats.ingestion, "Bytes"),
@@ -358,14 +336,7 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
358336
let storage_stats =
359337
StorageStats::new(format!("{} {}", stats.storage, "Bytes"), "parquet");
360338

361-
QueriedStats::new(
362-
&stream_name,
363-
&stream_meta.created_at,
364-
Some(first_event_at.to_owned()),
365-
time,
366-
ingestion_stats,
367-
storage_stats,
368-
)
339+
QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats)
369340
}
370341

371342
None => {
@@ -377,14 +348,7 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
377348
let storage_stats =
378349
StorageStats::new(format!("{} {}", stats.storage, "Bytes"), "parquet");
379350

380-
QueriedStats::new(
381-
&stream_name,
382-
&stream_meta.created_at,
383-
None,
384-
time,
385-
ingestion_stats,
386-
storage_stats,
387-
)
351+
QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats)
388352
}
389353
};
390354
let stats = if let Some(mut ingester_stats) = ingester_stats {
@@ -505,6 +469,8 @@ pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamE
505469
static_schema_flag: stream_meta.static_schema_flag.clone(),
506470
};
507471

472+
// get the other info from
473+
508474
Ok((web::Json(stream_info), StatusCode::OK))
509475
}
510476

server/src/storage/staging.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -170,13 +170,13 @@ impl StorageDir {
170170
.port();
171171
let filename = filename.rsplit_once('.').unwrap();
172172
let filename = format!("{}.{}.{}", filename.0, port, filename.1);
173-
/*
174-
let file_stem = path.file_stem().unwrap().to_str().unwrap();
175-
let random_string =
176-
rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 20);
177-
let (_, filename) = file_stem.split_once('.').unwrap();
178-
let filename_with_random_number = format!("{}.{}.{}", filename, random_number, "arrows");
179-
*/
173+
/*
174+
let file_stem = path.file_stem().unwrap().to_str().unwrap();
175+
let random_string =
176+
rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 20);
177+
let (_, filename) = file_stem.split_once('.').unwrap();
178+
let filename_with_random_number = format!("{}.{}.{}", filename, random_number, "arrows");
179+
*/
180180

181181
let mut parquet_path = path.to_owned();
182182
parquet_path.set_file_name(filename);

0 commit comments

Comments
 (0)