Skip to content

Commit bde79a5

Browse files
authored
x: try to export NIFs via rustler for Erlang client (#27)
* erl: init * erl: wip
1 parent 857d8cd commit bde79a5

File tree

8 files changed

+226
-9
lines changed

8 files changed

+226
-9
lines changed

Cargo.lock

Lines changed: 66 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ members = [
44
"src/hstreamdb-pb",
55
"src/utils/hstreamdb-test-utils",
66
"src/utils/workspace-hack",
7+
"src/x/hstreamdb-erl-nifs",
78
]

src/hstreamdb/src/common.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,5 +98,17 @@ pub enum Payload {
9898
RawRecord(Vec<u8>),
9999
}
100100

101+
impl From<Vec<u8>> for Payload {
102+
fn from(payload: Vec<u8>) -> Self {
103+
Self::RawRecord(payload)
104+
}
105+
}
106+
107+
impl From<prost_types::Struct> for Payload {
108+
fn from(payload: prost_types::Struct) -> Self {
109+
Self::HRecord(payload)
110+
}
111+
}
112+
101113
pub(crate) type ShardId = u64;
102114
pub type PartitionKey = String;

src/utils/hstreamdb-test-utils/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,4 @@ env_logger = "0.9.0"
99

1010
anyhow = "1.0.62"
1111
rand = "0.8.5"
12+
workspace-hack = { version = "0.1", path = "../workspace-hack" }

src/utils/workspace-hack/Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ log = { version = "0.4", default-features = false, features = ["std"] }
2020
prost = { version = "0.11", features = ["prost-derive", "std"] }
2121
prost-types = { version = "0.11", features = ["std"] }
2222
rand = { version = "0.8", features = ["alloc", "getrandom", "libc", "rand_chacha", "small_rng", "std", "std_rng"] }
23-
regex = { version = "1", default-features = false, features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode-bool"] }
24-
regex-syntax = { version = "0.6", default-features = false, features = ["unicode-bool"] }
23+
regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
24+
regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
2525
tokio = { version = "1", features = ["bytes", "io-std", "io-util", "libc", "macros", "memchr", "mio", "net", "num_cpus", "once_cell", "rt", "rt-multi-thread", "sync", "time", "tokio-macros"] }
2626

2727
[build-dependencies]
@@ -31,8 +31,8 @@ indexmap = { version = "1", default-features = false, features = ["std"] }
3131
log = { version = "0.4", default-features = false, features = ["std"] }
3232
prost = { version = "0.11", features = ["prost-derive", "std"] }
3333
prost-types = { version = "0.11", features = ["std"] }
34-
regex = { version = "1", default-features = false, features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode-bool"] }
35-
regex-syntax = { version = "0.6", default-features = false, features = ["unicode-bool"] }
34+
regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
35+
regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
3636
syn = { version = "1", features = ["clone-impls", "derive", "extra-traits", "full", "parsing", "printing", "proc-macro", "quote", "visit", "visit-mut"] }
3737

3838
### END HAKARI SECTION
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
[package]
2+
name = "hstreamdb-erl-nifs"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[lib]
7+
crate-type = ["cdylib"]
8+
9+
[dependencies]
10+
workspace-hack = { version = "0.1", path = "../../utils/workspace-hack" }
11+
hstreamdb = { path = "../../hstreamdb" }
12+
13+
log = "0.4.17"
14+
env_logger = "0.9.0"
15+
16+
once_cell = "1.14.0"
17+
rustler = "0.26.0"
18+
tokio = { version = "1.21.0", features = ["rt-multi-thread"] }
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
use hstreamdb::client::Client;
2+
use hstreamdb::producer::FlushSettings;
3+
use hstreamdb::{CompressionType, Record, Stream};
4+
use rustler::{resource, Atom, Env, ResourceArc, Term};
5+
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
6+
7+
mod runtime;
8+
9+
rustler::atoms! {
10+
none, gzip, zstd
11+
}
12+
13+
#[derive(Clone)]
14+
pub struct NifAppender(UnboundedSender<Record>);
15+
16+
fn load(env: Env, _: Term) -> bool {
17+
resource!(NifAppender, env);
18+
true
19+
}
20+
21+
#[rustler::nif]
22+
pub fn create_stream(
23+
url: String,
24+
stream_name: String,
25+
replication_factor: u32,
26+
backlog_duration: u32,
27+
shard_count: u32,
28+
) {
29+
let future = async move {
30+
let mut client = Client::new(url).await.unwrap();
31+
client
32+
.create_stream(Stream {
33+
stream_name,
34+
replication_factor,
35+
backlog_duration,
36+
shard_count,
37+
})
38+
.await
39+
.unwrap()
40+
};
41+
_ = runtime::spawn(future)
42+
}
43+
44+
#[rustler::nif]
45+
pub fn start_producer(
46+
url: String,
47+
stream_name: String,
48+
compression_type: Atom,
49+
) -> ResourceArc<NifAppender> {
50+
let (request_sender, request_receiver) = unbounded_channel::<Record>();
51+
let future = async move {
52+
let compression_type = atom_to_compression_type(compression_type);
53+
let flush_settings = FlushSettings { len: 0, size: 0 };
54+
55+
let mut client = Client::new(url).await.unwrap();
56+
let (appender, mut producer) = client
57+
.new_producer(stream_name, compression_type, flush_settings)
58+
.await
59+
.unwrap();
60+
61+
_ = tokio::spawn(async move {
62+
let mut request_receiver = request_receiver;
63+
let mut appender = appender;
64+
while let Some(record) = request_receiver.recv().await {
65+
appender.append(record).unwrap()
66+
}
67+
});
68+
producer.start().await
69+
};
70+
_ = runtime::spawn(future);
71+
ResourceArc::new(NifAppender(request_sender))
72+
}
73+
74+
#[rustler::nif]
75+
fn append(producer: ResourceArc<NifAppender>, partition_key: String, raw_payload: String) {
76+
let record = Record {
77+
partition_key,
78+
payload: hstreamdb::Payload::RawRecord(raw_payload.into_bytes()),
79+
};
80+
let producer = &producer.0;
81+
producer.send(record).unwrap();
82+
}
83+
84+
pub fn atom_to_compression_type(compression_type: Atom) -> CompressionType {
85+
if compression_type == none() {
86+
CompressionType::None
87+
} else if compression_type == gzip() {
88+
CompressionType::Gzip
89+
} else if compression_type == zstd() {
90+
CompressionType::Zstd
91+
} else {
92+
panic!()
93+
}
94+
}
95+
96+
rustler::init!(
97+
"hstreamdb",
98+
[create_stream, start_producer, append],
99+
load = load
100+
);
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
use std::future::Future;
2+
3+
use once_cell::sync::Lazy;
4+
use tokio::runtime::{self, Runtime};
5+
use tokio::task::JoinHandle;
6+
7+
pub(crate) static TOKIO_RT: Lazy<Runtime> = Lazy::new(|| {
8+
runtime::Builder::new_multi_thread()
9+
.enable_all()
10+
.build()
11+
.map_err(|err| {
12+
log::error!("failed to init Tokio runtime: {err}");
13+
err
14+
})
15+
.unwrap()
16+
});
17+
18+
pub(crate) fn spawn<T>(future: T) -> JoinHandle<T::Output>
19+
where
20+
T: Future + Send + 'static,
21+
T::Output: Send + 'static,
22+
{
23+
TOKIO_RT.spawn(future)
24+
}

0 commit comments

Comments
 (0)