Skip to content

Commit 2f68ca5

Browse files
Merge branch 'main' into fix-sigterm
2 parents 123638c + ad977fd commit 2f68ca5

File tree

10 files changed

+97
-14
lines changed

10 files changed

+97
-14
lines changed

Cargo.lock

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,3 @@
11
[workspace]
22
members = ["server"]
33
resolver = "2"
4-
5-
[patch.crates-io]
6-
# object_store added support for SSE-C headers in:
7-
# - https://github.com/apache/arrow-rs/pull/6230
8-
# - https://github.com/apache/arrow-rs/pull/6260
9-
# But a new version hasn't been published to crates.io for this yet. So, we are using this patch temporarily.
10-
object_store = { git = "https://github.com/apache/arrow-rs.git", rev = "23b6ff9f432e8e29c08d47a315ba0b7cb8758225" }

server/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ arrow-json = "53.0.0"
1616
arrow-ipc = { version = "53.0.0", features = ["zstd"] }
1717
arrow-select = "53.0.0"
1818
datafusion = "42.0.0"
19-
object_store = { version = "0.11.0", features = ["cloud", "aws", "azure"] }
19+
object_store = { version = "0.11.1", features = ["cloud", "aws", "azure"] }
2020
parquet = "53.0.0"
2121
arrow-flight = { version = "53.0.0", features = [ "tls" ] }
2222
tonic = {version = "0.12.3", features = ["tls", "transport", "gzip", "zstd"] }

server/src/event/format.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ use std::{collections::HashMap, sync::Arc};
2222
use anyhow::{anyhow, Error as AnyError};
2323
use arrow_array::{RecordBatch, StringArray};
2424
use arrow_schema::{DataType, Field, Schema, TimeUnit};
25+
use chrono::DateTime;
26+
use serde_json::Value;
2527

2628
use crate::utils::{self, arrow::get_field};
2729

@@ -171,3 +173,30 @@ pub fn update_field_type_in_schema(
171173
.collect();
172174
Arc::new(Schema::new(new_schema))
173175
}
176+
177+
pub fn update_data_type_to_datetime(schema: Schema, value: Value) -> Schema {
178+
let new_schema: Vec<Field> = schema
179+
.fields()
180+
.iter()
181+
.map(|field| {
182+
if field.data_type() == &DataType::Utf8 {
183+
if let Value::Object(map) = &value {
184+
if let Some(Value::String(s)) = map.get(field.name()) {
185+
if DateTime::parse_from_rfc3339(s).is_ok() {
186+
// Update the field's data type to Timestamp
187+
return Field::new(
188+
field.name().clone(),
189+
DataType::Timestamp(TimeUnit::Millisecond, None),
190+
true,
191+
);
192+
}
193+
}
194+
}
195+
}
196+
// Return the original field if no update is needed
197+
Field::new(field.name(), field.data_type().clone(), true)
198+
})
199+
.collect();
200+
201+
Schema::new(new_schema)
202+
}

server/src/handlers/http/logstream.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use super::cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME};
2222
use super::ingest::create_stream_if_not_exists;
2323
use super::modal::utils::logstream_utils::create_update_stream;
2424
use crate::alerts::Alerts;
25+
use crate::event::format::update_data_type_to_datetime;
2526
use crate::handlers::STREAM_TYPE_KEY;
2627
use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
2728
use crate::metadata::STREAM_INFO;
@@ -36,6 +37,7 @@ use crate::{metadata, validator};
3637
use actix_web::http::header::{self, HeaderMap};
3738
use actix_web::http::StatusCode;
3839
use actix_web::{web, HttpRequest, Responder};
40+
use arrow_json::reader::infer_json_schema_from_iterator;
3941
use arrow_schema::{Field, Schema};
4042
use bytes::Bytes;
4143
use chrono::Utc;
@@ -89,6 +91,26 @@ pub async fn list(_: HttpRequest) -> impl Responder {
8991
web::Json(res)
9092
}
9193

94+
pub async fn detect_schema(body: Bytes) -> Result<impl Responder, StreamError> {
95+
let body_val: Value = serde_json::from_slice(&body)?;
96+
let value_arr: Vec<Value> = match body_val {
97+
Value::Array(arr) => arr,
98+
value @ Value::Object(_) => vec![value],
99+
_ => {
100+
return Err(StreamError::Custom {
101+
msg: "please send json events as part of the request".to_string(),
102+
status: StatusCode::BAD_REQUEST,
103+
})
104+
}
105+
};
106+
107+
let mut schema = infer_json_schema_from_iterator(value_arr.iter().map(Ok)).unwrap();
108+
for value in value_arr {
109+
schema = update_data_type_to_datetime(schema, value);
110+
}
111+
Ok((web::Json(schema), StatusCode::OK))
112+
}
113+
92114
pub async fn schema(req: HttpRequest) -> Result<impl Responder, StreamError> {
93115
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
94116
let schema = STREAM_INFO.schema(&stream_name)?;

server/src/handlers/http/modal/query_server.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,17 @@ impl QueryServer {
282282
web::resource("")
283283
.route(web::get().to(logstream::list).authorize(Action::ListStream)),
284284
)
285+
.service(
286+
web::scope("/schema/detect").service(
287+
web::resource("")
288+
// PUT "/logstream/{logstream}" ==> Create log stream
289+
.route(
290+
web::post()
291+
.to(logstream::detect_schema)
292+
.authorize(Action::DetectSchema),
293+
),
294+
),
295+
)
285296
.service(
286297
web::scope("/{logstream}")
287298
.service(

server/src/handlers/http/modal/server.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,17 @@ impl Server {
314314
web::resource("")
315315
.route(web::get().to(logstream::list).authorize(Action::ListStream)),
316316
)
317+
.service(
318+
web::scope("/schema/detect").service(
319+
web::resource("")
320+
// PUT "/logstream/{logstream}" ==> Create log stream
321+
.route(
322+
web::post()
323+
.to(logstream::detect_schema)
324+
.authorize(Action::DetectSchema),
325+
),
326+
),
327+
)
317328
.service(
318329
web::scope("/{logstream}")
319330
.service(

server/src/rbac/role.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ pub enum Action {
2525
CreateStream,
2626
ListStream,
2727
GetStreamInfo,
28+
DetectSchema,
2829
GetSchema,
2930
GetStats,
3031
DeleteStream,
@@ -140,6 +141,7 @@ impl RoleBuilder {
140141
| Action::GetAnalytics => Permission::Unit(action),
141142
Action::Ingest
142143
| Action::GetSchema
144+
| Action::DetectSchema
143145
| Action::GetStats
144146
| Action::GetRetention
145147
| Action::PutRetention
@@ -214,6 +216,7 @@ pub mod model {
214216
Action::DeleteStream,
215217
Action::ListStream,
216218
Action::GetStreamInfo,
219+
Action::DetectSchema,
217220
Action::GetSchema,
218221
Action::GetStats,
219222
Action::GetRetention,

server/src/storage/azure_blob.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use datafusion::datasource::object_store::{
3030
};
3131
use datafusion::execution::runtime_env::RuntimeConfig;
3232
use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder};
33-
use object_store::{ClientOptions, ObjectStore, PutPayload};
33+
use object_store::{BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryConfig};
3434
use relative_path::{RelativePath, RelativePathBuf};
3535
use std::path::Path as StdPath;
3636
use url::Url;
@@ -120,10 +120,17 @@ impl AzureBlobConfig {
120120
.with_connect_timeout(Duration::from_secs(CONNECT_TIMEOUT_SECS))
121121
.with_timeout(Duration::from_secs(REQUEST_TIMEOUT_SECS));
122122

123+
let retry_config = RetryConfig {
124+
max_retries: 5,
125+
retry_timeout: Duration::from_secs(120),
126+
backoff: BackoffConfig::default(),
127+
};
128+
123129
let mut builder = MicrosoftAzureBuilder::new()
124130
.with_endpoint(self.endpoint_url.clone())
125131
.with_account(&self.account)
126-
.with_container_name(&self.container);
132+
.with_container_name(&self.container)
133+
.with_retry(retry_config);
127134

128135
if let Some(access_key) = self.access_key.clone() {
129136
builder = builder.with_access_key(access_key)

server/src/storage/s3.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use futures::{StreamExt, TryStreamExt};
2828
use object_store::aws::{AmazonS3, AmazonS3Builder, AmazonS3ConfigKey, Checksum};
2929
use object_store::limit::LimitStore;
3030
use object_store::path::Path as StorePath;
31-
use object_store::{ClientOptions, ObjectStore, PutPayload};
31+
use object_store::{BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryConfig};
3232
use relative_path::{RelativePath, RelativePathBuf};
3333

3434
use std::collections::BTreeMap;
@@ -218,13 +218,19 @@ impl S3Config {
218218
if self.skip_tls {
219219
client_options = client_options.with_allow_invalid_certificates(true)
220220
}
221+
let retry_config = RetryConfig {
222+
max_retries: 5,
223+
retry_timeout: Duration::from_secs(30),
224+
backoff: BackoffConfig::default(),
225+
};
221226

222227
let mut builder = AmazonS3Builder::new()
223228
.with_region(&self.region)
224229
.with_endpoint(&self.endpoint_url)
225230
.with_bucket_name(&self.bucket_name)
226231
.with_virtual_hosted_style_request(!self.use_path_style)
227-
.with_allow_http(true);
232+
.with_allow_http(true)
233+
.with_retry(retry_config);
228234

229235
if self.set_checksum {
230236
builder = builder.with_checksum_algorithm(Checksum::SHA256)

0 commit comments

Comments
 (0)