Skip to content

producer: result for each append #36

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions src/hstreamdb/src/appender.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use crate::common::Record;
use std::sync::Arc;

use tokio::sync::oneshot;

use crate::common::{self, Record};
use crate::producer::{self, Request};

#[derive(Clone)]
Expand All @@ -13,9 +17,14 @@ impl Appender {
}

impl Appender {
pub fn append(&mut self, record: Record) -> Result<(), producer::SendError> {
pub fn append(
&mut self,
record: Record,
) -> Result<oneshot::Receiver<Result<String, Arc<common::Error>>>, producer::SendError> {
let (sender, receiver) = oneshot::channel();
self.request_sender
.send(Request(record))
.map_err(Into::into)
.send(Request(record, sender))
.map_err(Into::<producer::SendError>::into)?;
Ok(receiver)
}
}
60 changes: 51 additions & 9 deletions src/hstreamdb/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::error::Error;
use std::fmt::{Debug, Display};
use std::io::Write;
use std::mem;
use std::sync::Arc;

use flate2::write::GzEncoder;
use flate2::Compression;
Expand All @@ -14,19 +15,26 @@ use hstreamdb_pb::{
HStreamRecordHeader, ListShardsRequest, Shard,
};
use prost::Message;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tonic::transport::Channel;

use crate::channel_provider::Channels;
use crate::common::{self, PartitionKey, Record, ShardId};
use crate::utils::{self, clear_shard_buffer, lookup_shard, partition_key_to_shard_id};

type ResultVec = Vec<oneshot::Sender<Result<String, Arc<common::Error>>>>;

#[derive(Debug)]
pub(crate) struct Request(pub(crate) Record);
pub(crate) struct Request(
pub(crate) Record,
pub(crate) oneshot::Sender<Result<String, Arc<common::Error>>>,
);

pub struct Producer {
tasks: Vec<JoinHandle<()>>,
shard_buffer: HashMap<ShardId, Vec<Record>>,
shard_buffer_result: HashMap<ShardId, ResultVec>,
shard_buffer_state: HashMap<ShardId, BufferState>,
shard_urls: HashMap<ShardId, String>,
request_receiver: tokio::sync::mpsc::UnboundedReceiver<Request>,
Expand Down Expand Up @@ -84,6 +92,7 @@ impl Producer {
let producer = Producer {
tasks: Vec::new(),
shard_buffer: HashMap::new(),
shard_buffer_result: HashMap::new(),
shard_buffer_state: HashMap::new(),
shard_urls: HashMap::new(),
request_receiver,
Expand All @@ -98,7 +107,7 @@ impl Producer {
}

pub async fn start(&mut self) {
while let Some(Request(record)) = self.request_receiver.recv().await {
while let Some(Request(record, result_sender)) = self.request_receiver.recv().await {
let partition_key = record.partition_key.clone();
match partition_key_to_shard_id(&self.shards, partition_key.clone()) {
Err(err) => {
Expand Down Expand Up @@ -130,15 +139,25 @@ impl Producer {
buffer_state.modify(&record);
self.shard_buffer_state.insert(shard_id, buffer_state);
self.shard_buffer.insert(shard_id, vec![record]);
self.shard_buffer_result
.insert(shard_id, vec![result_sender]);
}
Some(buffer) => {
self.shard_buffer_result
.get_mut(&shard_id)
.unwrap()
.push(result_sender);
let buffer_state =
self.shard_buffer_state.get_mut(&shard_id).unwrap();
buffer_state.modify(&record);
buffer.push(record);
if buffer_state.check(&self.flush_settings) {
let buffer =
clear_shard_buffer(&mut self.shard_buffer, shard_id);
let results = clear_shard_buffer(
&mut self.shard_buffer_result,
shard_id,
);
self.shard_buffer_state.insert(shard_id, default());
let task = tokio::spawn(flush_(
self.channels.clone(),
Expand All @@ -147,6 +166,7 @@ impl Producer {
shard_url,
self.compression_type,
buffer,
results,
));
self.tasks.push(task);
}
Expand All @@ -160,6 +180,7 @@ impl Producer {

let mut shard_buffer = mem::take(&mut self.shard_buffer);
for (shard_id, buffer) in shard_buffer.iter_mut() {
let results = self.shard_buffer_result.get_mut(shard_id).unwrap();
let shard_url = self.shard_urls.get(shard_id);
let shard_url_is_none = shard_url.is_none();
match lookup_shard(
Expand All @@ -184,6 +205,7 @@ impl Producer {
shard_url,
self.compression_type,
mem::take(buffer),
mem::take(results),
));
self.tasks.push(task);
}
Expand All @@ -206,25 +228,43 @@ async fn flush(
shard_url: String,
compression_type: CompressionType,
buffer: Vec<Record>,
results: ResultVec,
) -> Result<(), String> {
if !buffer.is_empty() {
if buffer.is_empty() {
Ok(())
} else {
let channel = channels
.channel_at(shard_url.clone())
.await
.map_err(|err| format!("producer connect error: url = {shard_url}, {err}"))?;
append(
match append(
channel,
stream_name,
shard_id,
compression_type,
buffer.to_vec(),
)
.await
.map_err(|err| format!("producer append error: url = {shard_url}, {err}"))
.map(|x| log::debug!("append succeed: len = {}", x.len()))?;
Ok(())
} else {
Ok(())
{
Err(err) => {
let err = Arc::new(err);
for sender in results.into_iter() {
sender.send(Err(err.clone())).unwrap_or_else(|err| {
log::error!("return append result error: err = {}", err.unwrap_err())
})
}
Err(format!("producer append error: url = {shard_url}, {err}"))
}
Ok(append_result) => {
log::debug!("append succeed: len = {}", append_result.len());
for (result, sender) in append_result.into_iter().zip(results) {
sender.send(Ok(result)).unwrap_or_else(|err| {
log::error!("return append result error: ok = {}", err.unwrap())
})
}
Ok(())
}
}
}
}

Expand All @@ -235,6 +275,7 @@ async fn flush_(
shard_url: String,
compression_type: CompressionType,
buffer: Vec<Record>,
results: ResultVec,
) {
flush(
channels,
Expand All @@ -243,6 +284,7 @@ async fn flush_(
shard_url,
compression_type,
buffer,
results,
)
.await
.unwrap_or_else(|err| log::error!("{err}"))
Expand Down
10 changes: 5 additions & 5 deletions src/hstreamdb/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use num_bigint::BigInt;
use num_traits::Num;
use tonic::transport::Channel;

use crate::common::{self, PartitionKey, Record, ShardId};
use crate::common::{self, PartitionKey, ShardId};
use crate::{format_url, Error};

pub fn record_id_to_string(record_id: &RecordId) -> String {
Expand Down Expand Up @@ -39,15 +39,15 @@ pub async fn lookup_shard(
}
}

pub fn clear_shard_buffer(
shard_buffer: &mut HashMap<ShardId, Vec<Record>>,
pub fn clear_shard_buffer<A>(
shard_buffer: &mut HashMap<ShardId, Vec<A>>,
shard_id: ShardId,
) -> Vec<Record> {
) -> Vec<A> {
let raw_buffer = shard_buffer.get_mut(&shard_id).unwrap();
clear_buffer(raw_buffer)
}

pub fn clear_buffer(buffer: &mut Vec<Record>) -> Vec<Record> {
pub fn clear_buffer<A>(buffer: &mut Vec<A>) -> Vec<A> {
let mut new_buffer = Vec::new();
mem::swap(buffer, &mut new_buffer);
new_buffer
Expand Down
31 changes: 23 additions & 8 deletions src/hstreamdb/tests/consumer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,27 +60,42 @@ async fn test_consumer() {
.await
.unwrap();

let _ = tokio::spawn(async move {
let mut appender = appender;
let mut join_handles = Vec::new();
for _ in 0..10 {
let appender = appender.clone();
let join_handle = tokio::spawn(async move {
let mut appender = appender;
let mut results = Vec::new();

for _ in 0..10 {
for _ in 0..100 {
appender
let result = appender
.append(Record {
partition_key: "".to_string(),
payload: hstreamdb::common::Payload::RawRecord(
rand_alphanumeric(20).as_bytes().to_vec(),
),
})
.unwrap();
results.push(result)
}
}

drop(appender)
});
drop(appender);
results
});
join_handles.push(join_handle)
}

let mut producer = producer;
producer.start().await;
let producer = producer.start();
drop(appender);
producer.await;

for join_handle in join_handles {
let join_handle = join_handle.await.unwrap();
for result in join_handle {
println!("{}", result.await.unwrap().unwrap())
}
}

let mut stream = client
.streaming_fetch(
Expand Down
49 changes: 34 additions & 15 deletions src/hstreamdb/tests/producer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,42 @@ async fn test_producer() {
.await
.unwrap();

let _ = tokio::spawn(async move {
let mut appender = appender;
for _ in 0..100 {
appender
.append(Record {
partition_key: "".to_string(),
payload: hstreamdb::common::Payload::RawRecord(
rand_alphanumeric(20).as_bytes().to_vec(),
),
})
.unwrap();
}
drop(appender)
});
let mut join_handles = Vec::new();
for _ in 0..10 {
let appender = appender.clone();
let join_handle = tokio::spawn(async move {
let mut appender = appender;
let mut results = Vec::new();

for _ in 0..100 {
let result = appender
.append(Record {
partition_key: "".to_string(),
payload: hstreamdb::common::Payload::RawRecord(
rand_alphanumeric(20).as_bytes().to_vec(),
),
})
.unwrap();
results.push(result)
}

drop(appender);
results
});
join_handles.push(join_handle)
}

let mut producer = producer;
producer.start().await;
let producer = producer.start();
drop(appender);
producer.await;

for join_handle in join_handles {
let join_handle = join_handle.await.unwrap();
for result in join_handle {
println!("{}", result.await.unwrap().unwrap())
}
}

client
.delete_stream(stream_name, false, true)
Expand Down
2 changes: 1 addition & 1 deletion src/x/hstreamdb-erl-nifs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub fn start_producer(
let mut request_receiver = request_receiver;
let mut appender = appender;
while let Some(record) = request_receiver.recv().await {
appender.append(record).unwrap()
_ = appender.append(record).unwrap()
}
});
producer.start().await
Expand Down