Skip to content

Commit a4cb4da

Browse files
committed
erl: poc
1 parent 7e8f8f5 commit a4cb4da

File tree

7 files changed

+141
-13
lines changed

7 files changed

+141
-13
lines changed

src/x/hstreamdb-erl-nifs/src/lib.rs

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,29 @@
11
use hstreamdb::client::Client;
22
use hstreamdb::producer::FlushSettings;
33
use hstreamdb::{CompressionType, Record, Stream};
4+
use rustler::types::atom::ok;
45
use rustler::{resource, Atom, Env, ResourceArc, Term};
56
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
67

78
mod runtime;
89

910
rustler::atoms! {
10-
none, gzip, zstd
11+
none, gzip, zstd,
12+
len, size
1113
}
1214

15+
rustler::init!(
16+
"hstreamdb",
17+
[create_stream, start_producer, append],
18+
load = load
19+
);
20+
1321
#[derive(Clone)]
1422
pub struct NifAppender(UnboundedSender<Record>);
1523

1624
fn load(env: Env, _: Term) -> bool {
1725
resource!(NifAppender, env);
26+
env_logger::init();
1827
true
1928
}
2029

@@ -25,7 +34,7 @@ pub fn create_stream(
2534
replication_factor: u32,
2635
backlog_duration: u32,
2736
shard_count: u32,
28-
) {
37+
) -> Atom {
2938
let future = async move {
3039
let mut client = Client::new(url).await.unwrap();
3140
client
@@ -38,20 +47,21 @@ pub fn create_stream(
3847
.await
3948
.unwrap()
4049
};
41-
_ = runtime::spawn(future)
50+
_ = runtime::spawn(future);
51+
ok()
4252
}
4353

4454
#[rustler::nif]
4555
pub fn start_producer(
4656
url: String,
4757
stream_name: String,
4858
compression_type: Atom,
59+
flush_settings: Term,
4960
) -> ResourceArc<NifAppender> {
5061
let (request_sender, request_receiver) = unbounded_channel::<Record>();
62+
let compression_type = atom_to_compression_type(compression_type);
63+
let flush_settings = new_flush_settings(flush_settings);
5164
let future = async move {
52-
let compression_type = atom_to_compression_type(compression_type);
53-
let flush_settings = FlushSettings { len: 0, size: 0 };
54-
5565
let mut client = Client::new(url).await.unwrap();
5666
let (appender, mut producer) = client
5767
.new_producer(stream_name, compression_type, flush_settings)
@@ -72,16 +82,17 @@ pub fn start_producer(
7282
}
7383

7484
#[rustler::nif]
75-
fn append(producer: ResourceArc<NifAppender>, partition_key: String, raw_payload: String) {
85+
fn append(producer: ResourceArc<NifAppender>, partition_key: String, raw_payload: String) -> Atom {
7686
let record = Record {
7787
partition_key,
7888
payload: hstreamdb::Payload::RawRecord(raw_payload.into_bytes()),
7989
};
8090
let producer = &producer.0;
8191
producer.send(record).unwrap();
92+
ok()
8293
}
8394

84-
pub fn atom_to_compression_type(compression_type: Atom) -> CompressionType {
95+
fn atom_to_compression_type(compression_type: Atom) -> CompressionType {
8596
if compression_type == none() {
8697
CompressionType::None
8798
} else if compression_type == gzip() {
@@ -93,8 +104,28 @@ pub fn atom_to_compression_type(compression_type: Atom) -> CompressionType {
93104
}
94105
}
95106

96-
rustler::init!(
97-
"hstreamdb",
98-
[create_stream, start_producer, append],
99-
load = load
100-
);
107+
fn new_flush_settings(proplists: Term) -> FlushSettings {
108+
let proplists = proplists.into_list_iterator().unwrap();
109+
let mut len_v = usize::MAX;
110+
let mut size_v = usize::MAX;
111+
112+
for x in proplists {
113+
if x.is_tuple() {
114+
let (k, v): (Atom, usize) = x.decode().unwrap();
115+
if k == len() {
116+
len_v = v;
117+
} else if k == size() {
118+
size_v = v;
119+
}
120+
}
121+
}
122+
123+
if len_v == usize::MAX && size_v == usize::MAX {
124+
len_v = 0;
125+
size_v = 0;
126+
}
127+
FlushSettings {
128+
len: len_v,
129+
size: size_v,
130+
}
131+
}

x/hstreamdb_erl/.gitignore

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
.rebar3
2+
_*
3+
.eunit
4+
*.o
5+
*.beam
6+
*.plt
7+
*.swp
8+
*.swo
9+
.erlang.cookie
10+
ebin
11+
log
12+
erl_crash.dump
13+
.rebar
14+
logs
15+
_build
16+
.idea
17+
*.iml
18+
rebar3.crashdump
19+
*~

x/hstreamdb_erl/rebar.config

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
{erl_opts, [debug_info]}.
2+
3+
{deps, []}.
4+
5+
{project_plugins, [
6+
erlfmt
7+
]}.
8+
9+
{erlfmt, [write]}.
10+
11+
{pre_hooks, [
12+
{
13+
compile,
14+
"$(cd ../../ && cargo build --release)"
15+
}
16+
]}.

x/hstreamdb_erl/rebar.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
[].

x/hstreamdb_erl/src/hstreamdb.erl

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
-module(hstreamdb).
2+
3+
-on_load(init/0).
4+
5+
-export([create_stream/5, start_producer/4, append/3]).
6+
7+
-export_type([producer/0, compression_type/0]).
8+
9+
-type producer() :: any().
10+
-type compression_type() :: none | gzip | zstd.
11+
12+
init() ->
13+
Load = erlang:load_nif("../../target/release/libhstreamdb_erl_nifs", 0),
14+
io:format("~p~n", [Load]).
15+
16+
-spec create_stream(
17+
ServerUrl :: binary(),
18+
StreamName :: binary(),
19+
ReplicationFactor :: pos_integer(),
20+
BacklogDuration :: pos_integer(),
21+
ShardCount :: pos_integer()
22+
) ->
23+
ok.
24+
create_stream(
25+
_ServerUrl,
26+
_StreamName,
27+
_ReplicationFactor,
28+
_BacklogDuration,
29+
_ShardCount
30+
) ->
31+
none.
32+
33+
-spec start_producer(
34+
ServerUrl :: binary(),
35+
StreamName :: binary(),
36+
CompressionType :: compression_type(),
37+
FlushSettings :: proplists:proplist()
38+
) ->
39+
producer().
40+
start_producer(_ServerUrl, _StreamName, _CompressionType, _FlushSettings) ->
41+
none.
42+
43+
-spec append(Producer :: producer(), PartitionKey :: binary(), RawPayload :: binary()) ->
44+
ok.
45+
append(_Producer, _PartitionKey, _RawPayload) ->
46+
none.
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{application, hstreamdb_erl, [
2+
{description, "An OTP library"},
3+
{vsn, "0.1.0"},
4+
{registered, []},
5+
{applications, [
6+
kernel,
7+
stdlib
8+
]},
9+
{env, []},
10+
{modules, []},
11+
{links, []}
12+
]}.

x/hstreamdb_erl/src/hstreamdb_erl.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
-module(hstreamdb_erl).
2+
3+
-export([]).

0 commit comments

Comments
 (0)