Skip to content

Commit 83428c0

Browse files
authored
producer: result for each append (#36)
* try to add result (need to fix tests) * fix tests * fix
1 parent 151ee22 commit 83428c0

File tree

6 files changed

+127
-42
lines changed

6 files changed

+127
-42
lines changed

src/hstreamdb/src/appender.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
use crate::common::Record;
1+
use std::sync::Arc;
2+
3+
use tokio::sync::oneshot;
4+
5+
use crate::common::{self, Record};
26
use crate::producer::{self, Request};
37

48
#[derive(Clone)]
@@ -13,9 +17,14 @@ impl Appender {
1317
}
1418

1519
impl Appender {
16-
pub fn append(&mut self, record: Record) -> Result<(), producer::SendError> {
20+
pub fn append(
21+
&mut self,
22+
record: Record,
23+
) -> Result<oneshot::Receiver<Result<String, Arc<common::Error>>>, producer::SendError> {
24+
let (sender, receiver) = oneshot::channel();
1725
self.request_sender
18-
.send(Request(record))
19-
.map_err(Into::into)
26+
.send(Request(record, sender))
27+
.map_err(Into::<producer::SendError>::into)?;
28+
Ok(receiver)
2029
}
2130
}

src/hstreamdb/src/producer.rs

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::error::Error;
44
use std::fmt::{Debug, Display};
55
use std::io::Write;
66
use std::mem;
7+
use std::sync::Arc;
78

89
use flate2::write::GzEncoder;
910
use flate2::Compression;
@@ -14,19 +15,26 @@ use hstreamdb_pb::{
1415
HStreamRecordHeader, ListShardsRequest, Shard,
1516
};
1617
use prost::Message;
18+
use tokio::sync::oneshot;
1719
use tokio::task::JoinHandle;
1820
use tonic::transport::Channel;
1921

2022
use crate::channel_provider::Channels;
2123
use crate::common::{self, PartitionKey, Record, ShardId};
2224
use crate::utils::{self, clear_shard_buffer, lookup_shard, partition_key_to_shard_id};
2325

26+
type ResultVec = Vec<oneshot::Sender<Result<String, Arc<common::Error>>>>;
27+
2428
#[derive(Debug)]
25-
pub(crate) struct Request(pub(crate) Record);
29+
pub(crate) struct Request(
30+
pub(crate) Record,
31+
pub(crate) oneshot::Sender<Result<String, Arc<common::Error>>>,
32+
);
2633

2734
pub struct Producer {
2835
tasks: Vec<JoinHandle<()>>,
2936
shard_buffer: HashMap<ShardId, Vec<Record>>,
37+
shard_buffer_result: HashMap<ShardId, ResultVec>,
3038
shard_buffer_state: HashMap<ShardId, BufferState>,
3139
shard_urls: HashMap<ShardId, String>,
3240
request_receiver: tokio::sync::mpsc::UnboundedReceiver<Request>,
@@ -84,6 +92,7 @@ impl Producer {
8492
let producer = Producer {
8593
tasks: Vec::new(),
8694
shard_buffer: HashMap::new(),
95+
shard_buffer_result: HashMap::new(),
8796
shard_buffer_state: HashMap::new(),
8897
shard_urls: HashMap::new(),
8998
request_receiver,
@@ -98,7 +107,7 @@ impl Producer {
98107
}
99108

100109
pub async fn start(&mut self) {
101-
while let Some(Request(record)) = self.request_receiver.recv().await {
110+
while let Some(Request(record, result_sender)) = self.request_receiver.recv().await {
102111
let partition_key = record.partition_key.clone();
103112
match partition_key_to_shard_id(&self.shards, partition_key.clone()) {
104113
Err(err) => {
@@ -130,15 +139,25 @@ impl Producer {
130139
buffer_state.modify(&record);
131140
self.shard_buffer_state.insert(shard_id, buffer_state);
132141
self.shard_buffer.insert(shard_id, vec![record]);
142+
self.shard_buffer_result
143+
.insert(shard_id, vec![result_sender]);
133144
}
134145
Some(buffer) => {
146+
self.shard_buffer_result
147+
.get_mut(&shard_id)
148+
.unwrap()
149+
.push(result_sender);
135150
let buffer_state =
136151
self.shard_buffer_state.get_mut(&shard_id).unwrap();
137152
buffer_state.modify(&record);
138153
buffer.push(record);
139154
if buffer_state.check(&self.flush_settings) {
140155
let buffer =
141156
clear_shard_buffer(&mut self.shard_buffer, shard_id);
157+
let results = clear_shard_buffer(
158+
&mut self.shard_buffer_result,
159+
shard_id,
160+
);
142161
self.shard_buffer_state.insert(shard_id, default());
143162
let task = tokio::spawn(flush_(
144163
self.channels.clone(),
@@ -147,6 +166,7 @@ impl Producer {
147166
shard_url,
148167
self.compression_type,
149168
buffer,
169+
results,
150170
));
151171
self.tasks.push(task);
152172
}
@@ -160,6 +180,7 @@ impl Producer {
160180

161181
let mut shard_buffer = mem::take(&mut self.shard_buffer);
162182
for (shard_id, buffer) in shard_buffer.iter_mut() {
183+
let results = self.shard_buffer_result.get_mut(shard_id).unwrap();
163184
let shard_url = self.shard_urls.get(shard_id);
164185
let shard_url_is_none = shard_url.is_none();
165186
match lookup_shard(
@@ -184,6 +205,7 @@ impl Producer {
184205
shard_url,
185206
self.compression_type,
186207
mem::take(buffer),
208+
mem::take(results),
187209
));
188210
self.tasks.push(task);
189211
}
@@ -206,25 +228,43 @@ async fn flush(
206228
shard_url: String,
207229
compression_type: CompressionType,
208230
buffer: Vec<Record>,
231+
results: ResultVec,
209232
) -> Result<(), String> {
210-
if !buffer.is_empty() {
233+
if buffer.is_empty() {
234+
Ok(())
235+
} else {
211236
let channel = channels
212237
.channel_at(shard_url.clone())
213238
.await
214239
.map_err(|err| format!("producer connect error: url = {shard_url}, {err}"))?;
215-
append(
240+
match append(
216241
channel,
217242
stream_name,
218243
shard_id,
219244
compression_type,
220245
buffer.to_vec(),
221246
)
222247
.await
223-
.map_err(|err| format!("producer append error: url = {shard_url}, {err}"))
224-
.map(|x| log::debug!("append succeed: len = {}", x.len()))?;
225-
Ok(())
226-
} else {
227-
Ok(())
248+
{
249+
Err(err) => {
250+
let err = Arc::new(err);
251+
for sender in results.into_iter() {
252+
sender.send(Err(err.clone())).unwrap_or_else(|err| {
253+
log::error!("return append result error: err = {}", err.unwrap_err())
254+
})
255+
}
256+
Err(format!("producer append error: url = {shard_url}, {err}"))
257+
}
258+
Ok(append_result) => {
259+
log::debug!("append succeed: len = {}", append_result.len());
260+
for (result, sender) in append_result.into_iter().zip(results) {
261+
sender.send(Ok(result)).unwrap_or_else(|err| {
262+
log::error!("return append result error: ok = {}", err.unwrap())
263+
})
264+
}
265+
Ok(())
266+
}
267+
}
228268
}
229269
}
230270

@@ -235,6 +275,7 @@ async fn flush_(
235275
shard_url: String,
236276
compression_type: CompressionType,
237277
buffer: Vec<Record>,
278+
results: ResultVec,
238279
) {
239280
flush(
240281
channels,
@@ -243,6 +284,7 @@ async fn flush_(
243284
shard_url,
244285
compression_type,
245286
buffer,
287+
results,
246288
)
247289
.await
248290
.unwrap_or_else(|err| log::error!("{err}"))

src/hstreamdb/src/utils.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use num_bigint::BigInt;
88
use num_traits::Num;
99
use tonic::transport::Channel;
1010

11-
use crate::common::{self, PartitionKey, Record, ShardId};
11+
use crate::common::{self, PartitionKey, ShardId};
1212
use crate::{format_url, Error};
1313

1414
pub fn record_id_to_string(record_id: &RecordId) -> String {
@@ -39,15 +39,15 @@ pub async fn lookup_shard(
3939
}
4040
}
4141

42-
pub fn clear_shard_buffer(
43-
shard_buffer: &mut HashMap<ShardId, Vec<Record>>,
42+
pub fn clear_shard_buffer<A>(
43+
shard_buffer: &mut HashMap<ShardId, Vec<A>>,
4444
shard_id: ShardId,
45-
) -> Vec<Record> {
45+
) -> Vec<A> {
4646
let raw_buffer = shard_buffer.get_mut(&shard_id).unwrap();
4747
clear_buffer(raw_buffer)
4848
}
4949

50-
pub fn clear_buffer(buffer: &mut Vec<Record>) -> Vec<Record> {
50+
pub fn clear_buffer<A>(buffer: &mut Vec<A>) -> Vec<A> {
5151
let mut new_buffer = Vec::new();
5252
mem::swap(buffer, &mut new_buffer);
5353
new_buffer

src/hstreamdb/tests/consumer_test.rs

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,27 +60,42 @@ async fn test_consumer() {
6060
.await
6161
.unwrap();
6262

63-
let _ = tokio::spawn(async move {
64-
let mut appender = appender;
63+
let mut join_handles = Vec::new();
64+
for _ in 0..10 {
65+
let appender = appender.clone();
66+
let join_handle = tokio::spawn(async move {
67+
let mut appender = appender;
68+
let mut results = Vec::new();
6569

66-
for _ in 0..10 {
6770
for _ in 0..100 {
68-
appender
71+
let result = appender
6972
.append(Record {
7073
partition_key: "".to_string(),
7174
payload: hstreamdb::common::Payload::RawRecord(
7275
rand_alphanumeric(20).as_bytes().to_vec(),
7376
),
7477
})
7578
.unwrap();
79+
results.push(result)
7680
}
77-
}
7881

79-
drop(appender)
80-
});
82+
drop(appender);
83+
results
84+
});
85+
join_handles.push(join_handle)
86+
}
8187

8288
let mut producer = producer;
83-
producer.start().await;
89+
let producer = producer.start();
90+
drop(appender);
91+
producer.await;
92+
93+
for join_handle in join_handles {
94+
let join_handle = join_handle.await.unwrap();
95+
for result in join_handle {
96+
println!("{}", result.await.unwrap().unwrap())
97+
}
98+
}
8499

85100
let mut stream = client
86101
.streaming_fetch(

src/hstreamdb/tests/producer_test.rs

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -46,23 +46,42 @@ async fn test_producer() {
4646
.await
4747
.unwrap();
4848

49-
let _ = tokio::spawn(async move {
50-
let mut appender = appender;
51-
for _ in 0..100 {
52-
appender
53-
.append(Record {
54-
partition_key: "".to_string(),
55-
payload: hstreamdb::common::Payload::RawRecord(
56-
rand_alphanumeric(20).as_bytes().to_vec(),
57-
),
58-
})
59-
.unwrap();
60-
}
61-
drop(appender)
62-
});
49+
let mut join_handles = Vec::new();
50+
for _ in 0..10 {
51+
let appender = appender.clone();
52+
let join_handle = tokio::spawn(async move {
53+
let mut appender = appender;
54+
let mut results = Vec::new();
55+
56+
for _ in 0..100 {
57+
let result = appender
58+
.append(Record {
59+
partition_key: "".to_string(),
60+
payload: hstreamdb::common::Payload::RawRecord(
61+
rand_alphanumeric(20).as_bytes().to_vec(),
62+
),
63+
})
64+
.unwrap();
65+
results.push(result)
66+
}
67+
68+
drop(appender);
69+
results
70+
});
71+
join_handles.push(join_handle)
72+
}
6373

6474
let mut producer = producer;
65-
producer.start().await;
75+
let producer = producer.start();
76+
drop(appender);
77+
producer.await;
78+
79+
for join_handle in join_handles {
80+
let join_handle = join_handle.await.unwrap();
81+
for result in join_handle {
82+
println!("{}", result.await.unwrap().unwrap())
83+
}
84+
}
6685

6786
client
6887
.delete_stream(stream_name, false, true)

src/x/hstreamdb-erl-nifs/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ pub fn start_producer(
9393
let mut request_receiver = request_receiver;
9494
let mut appender = appender;
9595
while let Some(record) = request_receiver.recv().await {
96-
appender.append(record).unwrap()
96+
_ = appender.append(record).unwrap()
9797
}
9898
});
9999
producer.start().await

0 commit comments

Comments
 (0)