Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/hstreamdb/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ pub enum Error {
PBUnwrapError(String),
#[error("No channel is available")]
NoChannelAvailable,
#[error("{0}")]
BadArgument(String),
}

#[derive(Debug, thiserror::Error)]
Expand Down
57 changes: 38 additions & 19 deletions src/x/hstreamdb-erl-nifs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::sync::oneshot;
mod runtime;

rustler::atoms! {
none, gzip, zstd,
compression_type, none, gzip, zstd,
concurrency_limit, len, size
}

Expand Down Expand Up @@ -106,14 +106,12 @@ pub fn create_stream(
pub fn try_start_producer(
url: String,
stream_name: String,
compression_type: Atom,
settings: Term,
) -> hstreamdb::Result<ResourceArc<NifAppender>> {
let (sender, receiver) = oneshot::channel();
let (request_sender, request_receiver) =
unbounded_channel::<Option<(Record, oneshot::Sender<AppendResultType>)>>();
let compression_type = atom_to_compression_type(compression_type);
let (concurrency_limit, flush_settings) = new_producer_settings(settings);
let (compression_type, concurrency_limit, flush_settings) = new_producer_settings(settings)?;
let future = async move {
let xs = async move {
let mut client = Client::new(
Expand Down Expand Up @@ -164,10 +162,9 @@ pub fn start_producer<'a>(
env: Env<'a>,
url: String,
stream_name: String,
compression_type: Atom,
settings: Term,
) -> NifResult<Term<'a>> {
try_start_producer(url, stream_name, compression_type, settings)
try_start_producer(url, stream_name, settings)
.map(|x| Encoder::encode(&(ok(), x), env))
.map_err(|err| rustler::Error::Term(Box::new(err.to_string())))
}
Expand Down Expand Up @@ -223,33 +220,48 @@ fn await_append_result(env: Env, x: ResourceArc<AppendResultFuture>) -> Term {
result.encode(env)
}

fn atom_to_compression_type(compression_type: Atom) -> CompressionType {
fn atom_to_compression_type(compression_type: Atom) -> Option<CompressionType> {
if compression_type == none() {
CompressionType::None
Some(CompressionType::None)
} else if compression_type == gzip() {
CompressionType::Gzip
Some(CompressionType::Gzip)
} else if compression_type == zstd() {
CompressionType::Zstd
Some(CompressionType::Zstd)
} else {
panic!()
None
}
}

fn new_producer_settings(proplists: Term) -> (usize, FlushSettings) {
let proplists = proplists.into_list_iterator().unwrap();
fn new_producer_settings(
proplists: Term,
) -> hstreamdb::Result<(CompressionType, usize, FlushSettings)> {
let proplists = proplists
.into_list_iterator()
.map_err(|err| hstreamdb::Error::BadArgument(format!("{err:?}")))?;
let mut concurrency_limit_v = None;
let mut len_v = usize::MAX;
let mut size_v = usize::MAX;
let mut compression_type_v: Atom = none();

for x in proplists {
if x.is_tuple() {
let (k, v): (Atom, usize) = x.decode().unwrap();
let (k, v): (Atom, Term) = x
.decode()
.map_err(|err| hstreamdb::Error::BadArgument(format!("{err:?}")))?;
if k == concurrency_limit() {
concurrency_limit_v = Some(v)
concurrency_limit_v = v.decode().ok()
} else if k == len() {
len_v = v;
len_v = v
.decode()
.map_err(|err| hstreamdb::Error::BadArgument(format!("{err:?}")))?;
} else if k == size() {
size_v = v;
size_v = v
.decode()
.map_err(|err| hstreamdb::Error::BadArgument(format!("{err:?}")))?;
} else if k == compression_type() {
compression_type_v = v
.decode()
.map_err(|err| hstreamdb::Error::BadArgument(format!("{err:?}")))?;
}
}
}
Expand All @@ -258,11 +270,18 @@ fn new_producer_settings(proplists: Term) -> (usize, FlushSettings) {
len_v = 0;
size_v = 0;
}
(
let compression_type_v = atom_to_compression_type(compression_type_v).ok_or_else(|| {
hstreamdb::Error::BadArgument(format!(
"no match for compression type `{compression_type_v:?}`"
))
})?;

Ok((
compression_type_v,
concurrency_limit_v.unwrap_or(16),
FlushSettings {
len: len_v,
size: size_v,
},
)
))
}
12 changes: 8 additions & 4 deletions x/hstreamdb_erl/src/hstreamdb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@

-on_load(init/0).

-export([create_stream/5, start_producer/4, stop_producer/1, append/3, await_append_result/1]).
-export([create_stream/5, start_producer/3, stop_producer/1, append/3, await_append_result/1]).

-export_type([producer/0, compression_type/0]).

-type producer() :: any().
-type append_result() :: any().
-type compression_type() :: none | gzip | zstd.
-type producer_setting() ::
{compression_type, compression_type()}
| {concurrency_limit, pos_integer()}
| {len, non_neg_integer()}
| {size, non_neg_integer()}.

init() ->
ok = erlang:load_nif("../../target/release/libhstreamdb_erl_nifs", 0),
Expand All @@ -30,11 +35,10 @@ create_stream(ServerUrl, StreamName, ReplicationFactor, BacklogDuration, ShardCo
-spec start_producer(
ServerUrl :: binary(),
StreamName :: binary(),
CompressionType :: compression_type(),
ProducerSettings :: proplists:proplist()
ProducerSettings :: [producer_setting()]
) ->
{ok, producer()} | {error, binary()}.
start_producer(ServerUrl, StreamName, CompressionType, ProducerSettings) ->
start_producer(ServerUrl, StreamName, ProducerSettings) ->
none.

-spec stop_producer(Producer :: producer()) -> ok.
Expand Down