Skip to content

Commit 9651bf2

Browse files
authored
channel_provider: tls cfg (#77)
1 parent a5d6f7e commit 9651bf2

File tree

11 files changed

+192
-72
lines changed

11 files changed

+192
-72
lines changed

.cargo/config.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
[env]
22
RUST_LOG = "hstreamdb"
33

4-
TEST_SERVER_ADDR = "http://127.0.0.1:6570"
4+
TEST_SERVER_ADDR = "hstream://127.0.0.1:6570"

Cargo.lock

Lines changed: 96 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/hstreamdb/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ prost-types = "0.11.1"
2727
prost = "0.11.0"
2828
tokio = { version = "1.21.0", features = ["rt-multi-thread", "parking_lot"] }
2929
tokio-stream = "0.1.9"
30-
tonic = "0.8.0"
30+
tonic = { version = "0.8.2", features = ["tls"] }
3131
url = "2.2.2"
3232
workspace-hack = { version = "0.1", path = "../utils/workspace-hack" }
3333

src/hstreamdb/src/channel_provider.rs

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
use std::collections::HashMap;
2+
use std::default::default;
23
use std::iter::FromIterator;
34

45
use hstreamdb_pb::h_stream_api_client::HStreamApiClient;
56
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
67
use tokio::sync::oneshot;
7-
use tonic::transport::{Channel, Endpoint};
8+
use tonic::transport::{Channel, ClientTlsConfig, Endpoint};
89

910
use crate::client::get_available_node_addrs;
1011
use crate::common;
@@ -67,8 +68,39 @@ impl Channels {
6768
}
6869
}
6970

71+
#[derive(Debug, Default)]
7072
pub struct ChannelProviderSettings {
71-
pub concurrency_limit: Option<usize>,
73+
concurrency_limit: Option<usize>,
74+
pub(crate) client_tls_config: Option<ClientTlsConfig>,
75+
}
76+
77+
pub struct ChannelProviderSettingsBuilder(ChannelProviderSettings);
78+
79+
impl ChannelProviderSettings {
80+
pub fn builder() -> ChannelProviderSettingsBuilder {
81+
ChannelProviderSettingsBuilder(default())
82+
}
83+
}
84+
85+
impl ChannelProviderSettingsBuilder {
86+
pub fn build(self) -> ChannelProviderSettings {
87+
let ChannelProviderSettingsBuilder(channel_provider_settings) = self;
88+
channel_provider_settings
89+
}
90+
91+
pub fn set_concurrency_limit(self, concurrency_limit: usize) -> Self {
92+
Self(ChannelProviderSettings {
93+
concurrency_limit: Some(concurrency_limit),
94+
..self.0
95+
})
96+
}
97+
98+
pub fn set_client_tls_config(self, client_tls_config: ClientTlsConfig) -> Self {
99+
Self(ChannelProviderSettings {
100+
client_tls_config: Some(client_tls_config),
101+
..self.0
102+
})
103+
}
72104
}
73105

74106
impl ChannelProvider {
@@ -91,6 +123,9 @@ impl ChannelProvider {
91123
if let Some(concurrency_limit) = settings.concurrency_limit {
92124
endpoint = endpoint.concurrency_limit(concurrency_limit)
93125
}
126+
if let Some(client_tls_config) = settings.client_tls_config.clone() {
127+
endpoint = endpoint.tls_config(client_tls_config)?
128+
}
94129
match endpoint.connect().await {
95130
Err(err) => {
96131
log::warn!("connect to endpoint error: uri = {uri}, {err}");

src/hstreamdb/src/client.rs

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,26 @@ impl Client {
2626
where
2727
Destination: std::convert::Into<String>,
2828
{
29+
const HSTREAM_PREFIX: &str = "hstream";
2930
let server_url = server_url.into();
30-
let url = Url::parse(&server_url)?;
31-
let mut hstream_api_client = HStreamApiClient::connect(server_url).await?;
32-
let url_scheme = url.scheme().to_string();
31+
let (url_scheme, url) = {
32+
let url = Url::parse(&server_url)?;
33+
if url.scheme() == HSTREAM_PREFIX {
34+
let url_scheme = if channel_provider_settings.client_tls_config.is_none() {
35+
"http"
36+
} else {
37+
"https"
38+
};
39+
let server_url = &server_url[7..];
40+
(
41+
url_scheme.to_string(),
42+
Url::parse(format!("{url_scheme}{server_url}").as_str())?,
43+
)
44+
} else {
45+
(url.scheme().to_string(), url)
46+
}
47+
};
48+
let mut hstream_api_client = HStreamApiClient::connect(String::from(url)).await?;
3349
let channels = new_channel_provider(
3450
&url_scheme,
3551
&mut hstream_api_client,
@@ -242,14 +258,9 @@ mod tests {
242258
#[tokio::test(flavor = "multi_thread")]
243259
async fn test_stream_cld() {
244260
let addr = env::var("TEST_SERVER_ADDR").unwrap();
245-
let mut client = Client::new(
246-
addr,
247-
ChannelProviderSettings {
248-
concurrency_limit: None,
249-
},
250-
)
251-
.await
252-
.unwrap();
261+
let mut client = Client::new(addr, ChannelProviderSettings::builder().build())
262+
.await
263+
.unwrap();
253264

254265
let make_stream = |stream_name| Stream {
255266
stream_name,
@@ -282,14 +293,9 @@ mod tests {
282293
#[tokio::test(flavor = "multi_thread")]
283294
async fn test_subscription_cld() {
284295
let addr = env::var("TEST_SERVER_ADDR").unwrap();
285-
let mut client = Client::new(
286-
addr,
287-
ChannelProviderSettings {
288-
concurrency_limit: None,
289-
},
290-
)
291-
.await
292-
.unwrap();
296+
let mut client = Client::new(addr, ChannelProviderSettings::builder().build())
297+
.await
298+
.unwrap();
293299

294300
let make_stream = |stream_name| Stream {
295301
stream_name,

src/hstreamdb/src/flow_controller.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,9 @@ mod tests {
118118
env_logger::init();
119119

120120
let addr = env::var("TEST_SERVER_ADDR").unwrap();
121-
let mut client = Client::new(
122-
addr,
123-
ChannelProviderSettings {
124-
concurrency_limit: None,
125-
},
126-
)
127-
.await
128-
.unwrap();
121+
let mut client = Client::new(addr, ChannelProviderSettings::builder().build())
122+
.await
123+
.unwrap();
129124
let stream_name = format!("stream-{}", rand_alphanumeric(10));
130125
client
131126
.create_stream(Stream {
@@ -145,9 +140,7 @@ mod tests {
145140
.set_max_batch_len(100)
146141
.set_batch_deadline(1000)
147142
.build(),
148-
ChannelProviderSettings {
149-
concurrency_limit: None,
150-
},
143+
ChannelProviderSettings::builder().build(),
151144
None,
152145
)
153146
.await

src/hstreamdb/src/utils.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -133,14 +133,9 @@ mod tests {
133133
#[tokio::test(flavor = "multi_thread")]
134134
async fn test_partition_key_to_shard_id() {
135135
let addr = env::var("TEST_SERVER_ADDR").unwrap();
136-
let mut client = Client::new(
137-
addr,
138-
ChannelProviderSettings {
139-
concurrency_limit: None,
140-
},
141-
)
142-
.await
143-
.unwrap();
136+
let mut client = Client::new(addr, ChannelProviderSettings::builder().build())
137+
.await
138+
.unwrap();
144139

145140
let stream_name = rand_alphanumeric(20);
146141

0 commit comments

Comments
 (0)