Skip to content

Commit 8325a94

Browse files
authored
erl: add stop_producer (#39)
1 parent 83428c0 commit 8325a94

File tree

2 files changed

+21
-6
lines changed

2 files changed

+21
-6
lines changed

src/x/hstreamdb-erl-nifs/src/lib.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@ rustler::atoms! {
1414

1515
rustler::init!(
1616
"hstreamdb",
17-
[create_stream, start_producer, append],
17+
[create_stream, start_producer, stop_producer, append],
1818
load = load
1919
);
2020

2121
#[derive(Clone)]
22-
pub struct NifAppender(UnboundedSender<Record>);
22+
pub struct NifAppender(UnboundedSender<Option<Record>>);
2323

2424
fn load(env: Env, _: Term) -> bool {
2525
resource!(NifAppender, env);
@@ -65,7 +65,7 @@ pub fn start_producer(
6565
compression_type: Atom,
6666
flush_settings: Term,
6767
) -> ResourceArc<NifAppender> {
68-
let (request_sender, request_receiver) = unbounded_channel::<Record>();
68+
let (request_sender, request_receiver) = unbounded_channel::<Option<Record>>();
6969
let compression_type = atom_to_compression_type(compression_type);
7070
let flush_settings = new_flush_settings(flush_settings);
7171
let future = async move {
@@ -93,7 +93,10 @@ pub fn start_producer(
9393
let mut request_receiver = request_receiver;
9494
let mut appender = appender;
9595
while let Some(record) = request_receiver.recv().await {
96-
_ = appender.append(record).unwrap()
96+
match record {
97+
Some(record) => _ = appender.append(record).unwrap(),
98+
None => request_receiver.close(),
99+
}
97100
}
98101
});
99102
producer.start().await
@@ -102,14 +105,21 @@ pub fn start_producer(
102105
ResourceArc::new(NifAppender(request_sender))
103106
}
104107

108+
#[rustler::nif]
109+
fn stop_producer(producer: ResourceArc<NifAppender>) -> Atom {
110+
let producer = &producer.0;
111+
producer.send(None).unwrap();
112+
ok()
113+
}
114+
105115
#[rustler::nif]
106116
fn append(producer: ResourceArc<NifAppender>, partition_key: String, raw_payload: String) -> Atom {
107117
let record = Record {
108118
partition_key,
109119
payload: hstreamdb::Payload::RawRecord(raw_payload.into_bytes()),
110120
};
111121
let producer = &producer.0;
112-
producer.send(record).unwrap();
122+
producer.send(Some(record)).unwrap();
113123
ok()
114124
}
115125

x/hstreamdb_erl/src/hstreamdb.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
-on_load(init/0).
66

7-
-export([create_stream/5, start_producer/4, append/3]).
7+
-export([create_stream/5, start_producer/4, stop_producer/1, append/3]).
88

99
-export_type([producer/0, compression_type/0]).
1010

@@ -36,6 +36,11 @@ create_stream(ServerUrl, StreamName, ReplicationFactor, BacklogDuration, ShardCo
3636
start_producer(ServerUrl, StreamName, CompressionType, FlushSettings) ->
3737
none.
3838

39+
-spec stop_producer(
40+
Producer :: producer()
41+
) -> ok.
42+
stop_producer(Producer) -> none.
43+
3944
-spec append(Producer :: producer(), PartitionKey :: binary(), RawPayload :: binary()) ->
4045
ok.
4146
append(Producer, PartitionKey, RawPayload) ->

0 commit comments

Comments
 (0)