Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
302 changes: 97 additions & 205 deletions src/hstreamdb/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::default::default;
use std::error::Error;
use std::fmt::{Debug, Display};
use std::io::Write;
use std::mem;
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -24,7 +23,7 @@ use tonic::transport::Channel;
use crate::channel_provider::Channels;
use crate::common::{self, PartitionKey, Record, ShardId};
use crate::flow_controller::FlowControllerClient;
use crate::utils::{self, clear_shard_buffer, lookup_shard, partition_key_to_shard_id};
use crate::utils::{self, partition_key_to_shard_id};

type ResultVec = Vec<oneshot::Sender<Result<String, Arc<common::Error>>>>;

Expand Down Expand Up @@ -198,51 +197,11 @@ impl Producer {
.iter()
.map(|(_, timer)| timer.abort())
.for_each(drop);
let mut shard_buffer = mem::take(&mut self.shard_buffer);
for (shard_id, buffer) in shard_buffer.iter_mut() {
let results = self.shard_buffer_result.get_mut(shard_id).unwrap();
let shard_url = self.shard_urls.get(shard_id);
let shard_url_is_none = shard_url.is_none();
match lookup_shard(
&mut self.channels.channel().await,
&self.url_scheme,
*shard_id,
shard_url,
)
.await
{
Err(err) => {
log::error!("lookup shard error: shard_id = {shard_id}, {err}")
}
Ok(shard_url) => {
if shard_url_is_none {
self.shard_urls.insert(*shard_id, shard_url.clone());
};

let buffer = mem::take(buffer);
let buffer_size = get_buffer_size(&buffer);
let release = self
.flow_controller
.clone()
.map(|x| async move { x.release(buffer_size).await });
let task = flush_(
self.channels.clone(),
self.stream_name.clone(),
*shard_id,
shard_url,
self.compression_type,
buffer,
mem::take(results),
);
let task = tokio::spawn(async move {
task.await;
if let Some(release) = release {
release.await
}
});
self.tasks.push(task);
}
}
let shard_ids = self.shard_buffer.keys().copied().collect::<Vec<_>>();
for shard_id in shard_ids {
self.flush(shard_id).await.unwrap_or_else(|err| {
log::error!("producer flush error: shard_id = {shard_id}, {err}")
});
}

let tasks = std::mem::take(&mut self.tasks);
Expand All @@ -256,52 +215,9 @@ impl Producer {
async fn handle_flush_request(&mut self, request: Option<ShardId>) {
{
let shard_id = request.unwrap();
self.shard_buffer_timer.remove(&shard_id);
let shard_url = self.shard_urls.get(&shard_id);
let shard_url_is_none = shard_url.is_none();
match lookup_shard(
&mut self.channels.channel().await,
&self.url_scheme,
shard_id,
shard_url,
)
.await
{
Err(err) => {
log::error!("lookup shard error: shard_id = {shard_id}, {err}")
}
Ok(shard_url) => {
if shard_url_is_none {
self.shard_urls.insert(shard_id, shard_url.clone());
};
let buffer = clear_shard_buffer(&mut self.shard_buffer, shard_id);
let results = clear_shard_buffer(&mut self.shard_buffer_result, shard_id);
self.shard_buffer_state.insert(shard_id, default());

let buffer_size = get_buffer_size(&buffer);
let release = self
.flow_controller
.clone()
.map(|x| async move { x.release(buffer_size).await });
let task = flush_(
self.channels.clone(),
self.stream_name.clone(),
shard_id,
shard_url,
self.compression_type,
buffer,
results,
);
let task = tokio::spawn(async move {
task.await;
if let Some(release) = release {
release.await
}
});
self.tasks.push(task);
self.shard_buffer.remove(&shard_id);
}
}
self.flush(shard_id).await.unwrap_or_else(|err| {
log::error!("producer flush error: shard_id = {shard_id}, {err}")
});
}
}

Expand All @@ -314,123 +230,99 @@ impl Producer {
"get shard id by partition key error: partition_key = {partition_key}, {err}"
)
}
Ok(shard_id) => {
let shard_url = self.shard_urls.get(&shard_id);
let shard_url_is_none = shard_url.is_none();
match lookup_shard(
&mut self.channels.channel().await,
&self.url_scheme,
shard_id,
shard_url,
)
.await
{
Err(err) => {
log::error!("lookup shard error: shard_id = {shard_id}, {err}")
Ok(shard_id) => match self.shard_buffer.get_mut(&shard_id) {
None => {
let mut buffer_state: BufferState = default();
buffer_state.modify(&record);
self.shard_buffer_state.insert(shard_id, buffer_state);
self.shard_buffer.insert(shard_id, vec![record]);
self.shard_buffer_result
.insert(shard_id, vec![result_sender]);

let buffer_state = self.shard_buffer_state.get_mut(&shard_id).unwrap();
if buffer_state.check(&self.flush_settings) {
self.flush(shard_id).await.unwrap_or_else(|err| {
log::error!("producer flush error: shard_id = {shard_id}, {err}")
});
} else if let Some(deadline) = self.flush_settings.deadline {
let sender = self.deadline_request_sender.clone();
let timer = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(deadline as _)).await;
sender.send(shard_id).unwrap();
});
self.shard_buffer_timer.insert(shard_id, timer);
}
Ok(shard_url) => {
if shard_url_is_none {
self.shard_urls.insert(shard_id, shard_url.clone());
};
match self.shard_buffer.get_mut(&shard_id) {
None => {
let mut buffer_state: BufferState = default();
buffer_state.modify(&record);
self.shard_buffer_state.insert(shard_id, buffer_state);
self.shard_buffer.insert(shard_id, vec![record]);
self.shard_buffer_result
.insert(shard_id, vec![result_sender]);

let buffer_state =
self.shard_buffer_state.get_mut(&shard_id).unwrap();
if buffer_state.check(&self.flush_settings) {
let buffer =
clear_shard_buffer(&mut self.shard_buffer, shard_id);
let results =
clear_shard_buffer(&mut self.shard_buffer_result, shard_id);
self.shard_buffer_state.insert(shard_id, default());

let buffer_size = get_buffer_size(&buffer);
let release = self
.flow_controller
.clone()
.map(|x| async move { x.release(buffer_size).await });
let task = flush_(
self.channels.clone(),
self.stream_name.clone(),
shard_id,
shard_url,
self.compression_type,
buffer,
results,
);
let task = tokio::spawn(async move {
task.await;
if let Some(release) = release {
release.await
}
});
self.tasks.push(task);
self.shard_buffer.remove(&shard_id);
} else if let Some(deadline) = self.flush_settings.deadline {
let sender = self.deadline_request_sender.clone();
let timer = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(deadline as _))
.await;
sender.send(shard_id).unwrap();
});
self.shard_buffer_timer.insert(shard_id, timer);
}
}
Some(buffer) => {
if let Some(x) = self.shard_buffer_timer.remove(&shard_id) {
x.abort()
}
self.shard_buffer_result
.get_mut(&shard_id)
.unwrap()
.push(result_sender);
let buffer_state =
self.shard_buffer_state.get_mut(&shard_id).unwrap();
buffer_state.modify(&record);
buffer.push(record);

if buffer_state.check(&self.flush_settings) {
let buffer =
clear_shard_buffer(&mut self.shard_buffer, shard_id);
let results =
clear_shard_buffer(&mut self.shard_buffer_result, shard_id);
self.shard_buffer_state.insert(shard_id, default());

let buffer_size = get_buffer_size(&buffer);
let release = self
.flow_controller
.clone()
.map(|x| async move { x.release(buffer_size).await });
let task = flush_(
self.channels.clone(),
self.stream_name.clone(),
shard_id,
shard_url,
self.compression_type,
buffer,
results,
);
let task = tokio::spawn(async move {
task.await;
if let Some(release) = release {
release.await
}
});
self.tasks.push(task);
self.shard_buffer.remove(&shard_id);
}
}
}
}
Some(buffer) => {
if let Some(x) = self.shard_buffer_timer.remove(&shard_id) {
x.abort()
}
self.shard_buffer_result
.get_mut(&shard_id)
.unwrap()
.push(result_sender);
let buffer_state = self.shard_buffer_state.get_mut(&shard_id).unwrap();
buffer_state.modify(&record);
buffer.push(record);

if buffer_state.check(&self.flush_settings) {
self.flush(shard_id).await.unwrap_or_else(|err| {
log::error!("producer flush error: shard_id = {shard_id}, {err}")
});
}
}
},
}
}

async fn flush(&mut self, shard_id: ShardId) -> common::Result<()> {
let buffer = self.shard_buffer.remove(&shard_id).unwrap();
let results = self.shard_buffer_result.remove(&shard_id).unwrap();
_ = self.shard_buffer_state.remove(&shard_id);
if let Some(x) = self.shard_buffer_timer.remove(&shard_id) {
x.abort()
}
let shard_url = self.lookup_shard(shard_id).await?;
let buffer_size = get_buffer_size(&buffer);

let release = self
.flow_controller
.clone()
.map(|x| async move { x.release(buffer_size).await });
let task = flush_(
self.channels.clone(),
self.stream_name.clone(),
shard_id,
shard_url,
self.compression_type,
buffer,
results,
);
let task = tokio::spawn(async move {
task.await;
if let Some(release) = release {
release.await
}
});
self.tasks.push(task);

Ok(())
}

async fn lookup_shard(&mut self, shard_id: ShardId) -> common::Result<String> {
let shard_url = self.shard_urls.get(&shard_id);
let shard_url_is_none = shard_url.is_none();
let shard_url = utils::lookup_shard(
&mut self.channels.channel().await,
&self.url_scheme,
shard_id,
shard_url,
)
.await?;
if shard_url_is_none {
_ = self.shard_urls.insert(shard_id, shard_url.clone())
}
Ok(shard_url)
}
}

Expand Down
17 changes: 0 additions & 17 deletions src/hstreamdb/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
use std::collections::HashMap;
use std::mem;

use hstreamdb_pb::h_stream_api_client::HStreamApiClient;
use hstreamdb_pb::{LookupShardRequest, RecordId, Shard};
use md5::{Digest, Md5};
Expand Down Expand Up @@ -39,20 +36,6 @@ pub async fn lookup_shard(
}
}

pub fn clear_shard_buffer<A>(
shard_buffer: &mut HashMap<ShardId, Vec<A>>,
shard_id: ShardId,
) -> Vec<A> {
let raw_buffer = shard_buffer.get_mut(&shard_id).unwrap();
clear_buffer(raw_buffer)
}

pub fn clear_buffer<A>(buffer: &mut Vec<A>) -> Vec<A> {
let mut new_buffer = Vec::new();
mem::swap(buffer, &mut new_buffer);
new_buffer
}

pub fn partition_key_to_shard_id(
shards: &[Shard],
partition_key: PartitionKey,
Expand Down