Skip to content

Commit 1c5657d

Browse files
authored
erl: tls (#82)
1 parent a223b9e commit 1c5657d

File tree

6 files changed

+115
-41
lines changed

6 files changed

+115
-41
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/hstreamdb-pb/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ keywords = ["hstreamdb", "streaming-database", "database-client"]
1313
[dependencies]
1414
prost = "0.11.0"
1515
prost-types = "0.11.1"
16-
tonic = "0.8.0"
16+
tonic = "0.8.2"
1717
workspace-hack = { version = "0.1", path = "../utils/workspace-hack" }
1818

1919
[build-dependencies]

src/hstreamdb/src/channel_provider.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,9 @@ impl ChannelProviderSettingsBuilder {
9595
})
9696
}
9797

98-
pub fn set_client_tls_config(self, client_tls_config: ClientTlsConfig) -> Self {
98+
pub fn set_tls_config(self, tls_config: ClientTlsConfig) -> Self {
9999
Self(ChannelProviderSettings {
100-
client_tls_config: Some(client_tls_config),
100+
client_tls_config: Some(tls_config),
101101
..self.0
102102
})
103103
}

src/hstreamdb/src/client.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use hstreamdb_pb::{
44
CompressionType, DeleteStreamRequest, DeleteSubscriptionRequest, ListStreamsRequest,
55
ListSubscriptionsRequest, LookupSubscriptionRequest, NodeState,
66
};
7-
use tonic::transport::Channel;
7+
use tonic::transport::{Channel, ClientTlsConfig};
88
use tonic::Request;
99
use url::Url;
1010

@@ -16,6 +16,7 @@ use crate::{common, flow_controller, format_url, producer};
1616
pub struct Client {
1717
pub(crate) channels: Channels,
1818
pub(crate) url_scheme: String,
19+
tls_config: Option<ClientTlsConfig>,
1920
}
2021

2122
impl Client {
@@ -46,6 +47,7 @@ impl Client {
4647
}
4748
};
4849
let mut hstream_api_client = HStreamApiClient::connect(String::from(url)).await?;
50+
let tls_config = channel_provider_settings.client_tls_config.clone();
4951
let channels = new_channel_provider(
5052
&url_scheme,
5153
&mut hstream_api_client,
@@ -55,15 +57,19 @@ impl Client {
5557
Ok(Client {
5658
channels,
5759
url_scheme,
60+
tls_config,
5861
})
5962
}
6063
}
6164

6265
impl Client {
6366
async fn new_channel_provider(
6467
&self,
65-
channel_provider_settings: ChannelProviderSettings,
68+
mut channel_provider_settings: ChannelProviderSettings,
6669
) -> common::Result<Channels> {
70+
if channel_provider_settings.client_tls_config.is_none() {
71+
channel_provider_settings.client_tls_config = self.tls_config.clone()
72+
}
6773
new_channel_provider(
6874
&self.url_scheme,
6975
&mut self.channels.channel().await,

src/x/hstreamdb-erl-nifs/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,4 @@ tokio = { version = "1.21.0", features = ["rt-multi-thread", "parking_lot"] }
2020
tokio-stream = "0.1.11"
2121

2222
prost = "0.11.0"
23+
tonic = "0.8.2"

src/x/hstreamdb-erl-nifs/src/lib.rs

Lines changed: 102 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use rustler::{
1818
};
1919
use tokio::sync::{oneshot, Mutex, MutexGuard};
2020
use tokio_stream::StreamExt;
21+
use tonic::transport::{Certificate, ClientTlsConfig, Identity};
2122

2223
mod runtime;
2324

@@ -40,6 +41,7 @@ rustler::atoms! {
4041
eos, already_acked,
4142
create_shard_reader_reply, read_shard_reply,
4243
bad_hstream_record,
44+
tls_config,
4345
}
4446

4547
rustler::init!(
@@ -56,6 +58,10 @@ rustler::init!(
5658
async_ack,
5759
async_create_shard_reader,
5860
async_read_shard,
61+
new_client_tls_config,
62+
set_domain_name,
63+
set_ca_certificate,
64+
set_identity,
5965
],
6066
load = load
6167
);
@@ -145,25 +151,61 @@ fn load(env: Env, _: Term) -> bool {
145151
resource!(AppendResultFuture, env);
146152
resource!(NifResponder, env);
147153
resource!(NifShardReaderId, env);
154+
resource!(NifClientTlsConfig, env);
148155
env_logger::init();
149156
true
150157
}
151158

152159
#[rustler::nif]
153-
fn async_start_client(pid: LocalPid, url: String, _conf: Term) {
154-
let future = async move {
155-
let client = Client::new(url, ChannelProviderSettings::builder().build()).await;
156-
OwnedEnv::new().send_and_clear(&pid, |env| match client {
157-
Ok(client) => (
158-
start_client_reply(),
159-
ok(),
160-
ResourceArc::new(NifClient(client)),
161-
)
162-
.encode(env),
163-
Err(err) => (start_client_reply(), error(), err.to_string()).encode(env),
164-
})
165-
};
166-
runtime::spawn(future);
160+
fn async_start_client<'a>(env: Env<'a>, pid: LocalPid, url: String, options: Term) -> Term<'a> {
161+
match from_start_client_options(options) {
162+
Err(err) => (error(), (badarg(), err.to_string())).encode(env),
163+
Ok(channel_provider_settings) => {
164+
let future = async move {
165+
let client = Client::new(url, channel_provider_settings).await;
166+
OwnedEnv::new().send_and_clear(&pid, |env| match client {
167+
Ok(client) => (
168+
start_client_reply(),
169+
ok(),
170+
ResourceArc::new(NifClient(client)),
171+
)
172+
.encode(env),
173+
Err(err) => (start_client_reply(), error(), err.to_string()).encode(env),
174+
})
175+
};
176+
runtime::spawn(future);
177+
ok().to_term(env)
178+
}
179+
}
180+
}
181+
182+
fn from_start_client_options(proplists: Term) -> hstreamdb::Result<ChannelProviderSettings> {
183+
let proplists = proplists
184+
.into_list_iterator()
185+
.map_err(|err| hstreamdb::Error::BadArgument(format!("{err:?}")))?;
186+
187+
let mut channel_provider_settings = ChannelProviderSettings::builder();
188+
for x in proplists {
189+
if x.is_tuple() {
190+
let (k, v): (Atom, Term) = x
191+
.decode()
192+
.map_err(|err| hstreamdb::Error::BadArgument(format!("{err:?}")))?;
193+
if k == tls_config() {
194+
let v: ResourceArc<NifClientTlsConfig> = v
195+
.decode()
196+
.map_err(|err| hstreamdb::Error::BadArgument(format!("{err:?}")))?;
197+
let NifClientTlsConfig(v) = (*v).clone();
198+
channel_provider_settings = channel_provider_settings.set_tls_config(v)
199+
} else if k == concurrency_limit() {
200+
let v: usize = v
201+
.decode()
202+
.map_err(|err| hstreamdb::Error::BadArgument(format!("{err:?}")))?;
203+
channel_provider_settings = channel_provider_settings.set_concurrency_limit(v)
204+
}
205+
}
206+
}
207+
208+
todo!()
167209
}
168210

169211
#[rustler::nif]
@@ -352,16 +394,14 @@ fn async_stop_producer(pid: LocalPid, producer: ResourceArc<NifAppender>) {
352394
runtime::spawn(future);
353395
}
354396

355-
fn try_append<'a>(
356-
env: Env<'a>,
397+
#[rustler::nif]
398+
fn async_append(
357399
pid: LocalPid,
358400
producer: ResourceArc<NifAppender>,
359401
partition_key: String,
360-
raw_payload: Term,
361-
) -> Result<(), Term<'a>> {
362-
let raw_payload = rustler::Binary::from_term(raw_payload)
363-
.map_err(|err| (badarg(), format!("{err:?}")).encode(env))?
364-
.to_vec();
402+
raw_payload: Binary,
403+
) {
404+
let raw_payload = raw_payload.to_vec();
365405
let future = async move {
366406
let record = Record {
367407
partition_key,
@@ -376,21 +416,6 @@ fn try_append<'a>(
376416
}
377417
};
378418
runtime::spawn(future);
379-
Ok(())
380-
}
381-
382-
#[rustler::nif]
383-
fn async_append<'a>(
384-
env: Env<'a>,
385-
pid: LocalPid,
386-
producer: ResourceArc<NifAppender>,
387-
partition_key: String,
388-
raw_payload: Term,
389-
) -> Term<'a> {
390-
match try_append(env, pid, producer, partition_key, raw_payload) {
391-
Ok(()) => ok().encode(env),
392-
Err(err) => (error(), err).encode(env),
393-
}
394419
}
395420

396421
#[rustler::nif]
@@ -732,3 +757,44 @@ fn async_read_shard(
732757
};
733758
runtime::spawn(future);
734759
}
760+
761+
#[derive(Clone)]
762+
struct NifClientTlsConfig(ClientTlsConfig);
763+
764+
#[rustler::nif]
765+
fn new_client_tls_config() -> ResourceArc<NifClientTlsConfig> {
766+
ResourceArc::new(NifClientTlsConfig(ClientTlsConfig::new()))
767+
}
768+
769+
#[rustler::nif]
770+
fn set_domain_name(
771+
tls_config: ResourceArc<NifClientTlsConfig>,
772+
domain_name: String,
773+
) -> ResourceArc<NifClientTlsConfig> {
774+
let NifClientTlsConfig(tls_config) = (*tls_config).clone();
775+
ResourceArc::new(NifClientTlsConfig(tls_config.domain_name(domain_name)))
776+
}
777+
778+
#[rustler::nif]
779+
fn set_ca_certificate(
780+
tls_config: ResourceArc<NifClientTlsConfig>,
781+
ca_certificate: Binary,
782+
) -> ResourceArc<NifClientTlsConfig> {
783+
let NifClientTlsConfig(tls_config) = (*tls_config).clone();
784+
ResourceArc::new(NifClientTlsConfig(
785+
tls_config.ca_certificate(Certificate::from_pem(ca_certificate.as_slice())),
786+
))
787+
}
788+
789+
#[rustler::nif]
790+
fn set_identity(
791+
tls_config: ResourceArc<NifClientTlsConfig>,
792+
cert: Binary,
793+
key: Binary,
794+
) -> ResourceArc<NifClientTlsConfig> {
795+
let NifClientTlsConfig(tls_config) = &*tls_config;
796+
let tls_config = tls_config.clone();
797+
ResourceArc::new(NifClientTlsConfig(
798+
tls_config.identity(Identity::from_pem(cert.as_slice(), key.as_slice())),
799+
))
800+
}

0 commit comments

Comments
 (0)