Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 37 additions & 24 deletions src/x/hstreamdb-erl-nifs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::mem;
use std::sync::Arc;

use hstreamdb::appender::Appender;
use hstreamdb::client::Client;
use hstreamdb::producer::{FlushCallback, FlushSettings};
use hstreamdb::{ChannelProviderSettings, CompressionType, Record, RecordId, Stream};
Expand All @@ -19,15 +20,15 @@ rustler::atoms! {
max_batch_len, max_batch_size, batch_deadline,
on_flush, flush_result,
record_id,
create_stream_reply, start_producer_reply, append_reply, await_append_result_reply
create_stream_reply, start_producer_reply, append_reply, await_append_result_reply, stop_producer_reply
}

rustler::init!(
"hstreamdb",
[
async_create_stream,
async_start_producer,
stop_producer,
async_stop_producer,
async_append,
async_await_append_result
],
Expand All @@ -44,7 +45,7 @@ struct AppendResultFuture(Mutex<Option<AppendResultType>>, OnceCell<AppendResult

type AppendResultType = oneshot::Receiver<Result<RecordId, Arc<hstreamdb::Error>>>;
#[derive(Clone)]
pub struct NifAppender(flume::Sender<Option<(Record, LocalPid)>>);
pub struct NifAppender(flume::Sender<(Record, LocalPid)>, flume::Sender<()>);

fn load(env: Env, _: Term) -> bool {
resource!(NifAppender, env);
Expand Down Expand Up @@ -91,7 +92,8 @@ pub fn try_start_producer(
stream_name: String,
settings: Term,
) -> hstreamdb::common::Result<()> {
let (request_sender, request_receiver) = flume::bounded::<Option<(Record, LocalPid)>>(0);
let (request_sender, request_receiver) = flume::bounded::<(Record, LocalPid)>(0);
let (stop_sender, stop_receiver) = flume::bounded(0);
let ProducerSettings {
compression_type,
concurrency_limit,
Expand Down Expand Up @@ -122,22 +124,17 @@ pub fn try_start_producer(
.await?;

_ = tokio::spawn(async move {
let request_receiver = request_receiver;
let mut appender = appender;
while let Ok(record) = request_receiver.recv_async().await {
match record {
Some((record, pid)) => {
let result_receiver = appender.append(record).await.unwrap();
let result_future = ResourceArc::new(AppendResultFuture(
Mutex::new(Some(result_receiver)),
OnceCell::new(),
));
let mut env = OwnedEnv::new();
env.send_and_clear(&pid, |env| {
(append_reply(), ok(), result_future).encode(env)
})
loop {
tokio::select! {
biased;
_ = stop_receiver.recv_async() => break,
request = request_receiver.recv_async() => {
match request {
Err(_) => break,
Ok((record, pid)) => handle_append(&mut appender, record, &pid).await,
}
}
None => break,
}
}
drop(request_receiver)
Expand All @@ -151,7 +148,7 @@ pub fn try_start_producer(
Ok(()) => (
start_producer_reply(),
ok(),
ResourceArc::new(NifAppender(request_sender)),
ResourceArc::new(NifAppender(request_sender, stop_sender)),
)
.encode(env),
Err(err) => (start_producer_reply(), error(), err.to_string()).encode(env),
Expand All @@ -161,6 +158,16 @@ pub fn try_start_producer(
Ok(())
}

async fn handle_append(appender: &mut Appender, record: Record, pid: &LocalPid) {
let result_receiver = appender.append(record).await.unwrap();
let result_future = ResourceArc::new(AppendResultFuture(
Mutex::new(Some(result_receiver)),
OnceCell::new(),
));
let mut env = OwnedEnv::new();
env.send_and_clear(pid, |env| (append_reply(), ok(), result_future).encode(env))
}

#[rustler::nif]
pub fn async_start_producer<'a>(
env: Env<'a>,
Expand All @@ -175,10 +182,16 @@ pub fn async_start_producer<'a>(
}

#[rustler::nif]
fn stop_producer(producer: ResourceArc<NifAppender>) -> Atom {
let producer = &producer.0;
producer.send(None).unwrap_or(());
ok()
fn async_stop_producer(pid: LocalPid, producer: ResourceArc<NifAppender>) {
let future = async move {
let producer = &producer.1;
let result = producer.send_async(()).await;
OwnedEnv::new().send_and_clear(&pid, |env| match result {
Ok(_) => (stop_producer_reply(), ok()).encode(env),
Err(_) => (stop_producer_reply(), error(), terminated()).encode(env),
})
};
runtime::spawn(future);
}

fn try_append<'a>(
Expand All @@ -197,7 +210,7 @@ fn try_append<'a>(
payload: hstreamdb::Payload::RawRecord(raw_payload),
};
let producer = &producer.0;
if producer.send_async(Some((record, pid))).await.is_err() {
if producer.send_async((record, pid)).await.is_err() {
let mut env = OwnedEnv::new();
env.send_and_clear(&pid, |env| {
(append_reply(), error(), terminated()).encode(env)
Expand Down