Skip to content

refactor: use channel_provider #24

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion src/hstreamdb/src/channel_provider.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;

use hstreamdb_pb::h_stream_api_client::HStreamApiClient;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot;
use tokio::task::JoinSet;
use tonic::transport::Channel;
Expand All @@ -20,6 +20,21 @@ pub(crate) struct ChannelProvider {
channels: HashMap<String, HStreamApiClient<Channel>>,
}

pub(crate) async fn new_channel_provider(
url_scheme: &str,
channel: &mut HStreamApiClient<Channel>,
) -> common::Result<Channels> {
let (channel_provider_request_sender, channel_provider_request_receiver) = unbounded_channel();
let channels =
ChannelProvider::new(channel, url_scheme, channel_provider_request_receiver).await?;
_ = tokio::spawn(async move {
let mut channels = channels;
channels.start().await
});
let channels = Channels::new(channel_provider_request_sender);
Ok(channels)
}

#[derive(Clone)]
pub(crate) struct Channels(UnboundedSender<Request>);

Expand Down
50 changes: 33 additions & 17 deletions src/hstreamdb/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ use tonic::Request;
use url::Url;

use crate::appender::Appender;
use crate::channel_provider::{new_channel_provider, Channels};
use crate::producer::{FlushSettings, Producer};
use crate::{common, format_url, producer};

pub struct Client {
pub(crate) hstream_api_client: HStreamApiClient<Channel>,
pub(crate) channels: Channels,
url_scheme: String,
_available_node_addrs: Vec<String>,
}

impl Client {
Expand All @@ -26,16 +26,21 @@ impl Client {
let server_url = server_url.into();
let url = Url::parse(&server_url)?;
let mut hstream_api_client = HStreamApiClient::connect(server_url).await?;
let _available_node_addrs =
get_available_node_addrs(&mut hstream_api_client, url.scheme()).await?;
let url_scheme = url.scheme().to_string();
let channels = new_channel_provider(&url_scheme, &mut hstream_api_client).await?;
Ok(Client {
hstream_api_client,
url_scheme: url.scheme().to_string(),
_available_node_addrs,
channels,
url_scheme,
})
}
}

impl Client {
async fn new_channel_provider(&self) -> common::Result<Channels> {
new_channel_provider(&self.url_scheme, &mut self.channels.channel().await).await
}
}

pub(crate) async fn get_available_node_addrs(
client: &mut HStreamApiClient<Channel>,
url_scheme: &str,
Expand Down Expand Up @@ -64,7 +69,7 @@ impl Client {

impl Client {
pub async fn create_stream(&mut self, stream: Stream) -> common::Result<()> {
self.hstream_api_client.create_stream(stream).await?;
self.channels.channel().await.create_stream(stream).await?;
Ok(())
}

Expand All @@ -79,15 +84,19 @@ impl Client {
ignore_non_exist,
force,
};
self.hstream_api_client
self.channels
.channel()
.await
.delete_stream(delete_stream_request)
.await?;
Ok(())
}

pub async fn list_streams(&mut self) -> common::Result<Vec<Stream>> {
let streams = self
.hstream_api_client
.channels
.channel()
.await
.list_streams(ListStreamsRequest {})
.await?
.into_inner()
Expand All @@ -99,7 +108,9 @@ impl Client {
impl Client {
pub async fn create_subscription(&mut self, subscription: Subscription) -> common::Result<()> {
let subscription: hstreamdb_pb::Subscription = subscription.into();
self.hstream_api_client
self.channels
.channel()
.await
.create_subscription(subscription)
.await?;
Ok(())
Expand All @@ -110,8 +121,8 @@ impl Client {
subscription_id: String,
force: bool,
) -> common::Result<()> {
let channel = self.lookup_subscription(subscription_id.clone()).await?;
let mut channel = HStreamApiClient::connect(channel).await?;
let url = self.lookup_subscription(subscription_id.clone()).await?;
let mut channel = self.channels.channel_at(url).await?;
channel
.delete_subscription(DeleteSubscriptionRequest {
subscription_id,
Expand All @@ -123,7 +134,9 @@ impl Client {

pub async fn list_subscriptions(&mut self) -> common::Result<Vec<Subscription>> {
let subscriptions = self
.hstream_api_client
.channels
.channel()
.await
.list_subscriptions(ListSubscriptionsRequest {})
.await?
.into_inner()
Expand All @@ -142,13 +155,14 @@ impl Client {
compression_type: CompressionType,
flush_settings: FlushSettings,
) -> common::Result<(Appender, Producer)> {
let channel = self.hstream_api_client.clone();
let (request_sender, request_receiver) =
tokio::sync::mpsc::unbounded_channel::<producer::Request>();

let channels = self.new_channel_provider().await?;

let appender = Appender::new(request_sender.clone());
let producer = Producer::new(
channel,
channels,
self.url_scheme.clone(),
request_receiver,
stream_name,
Expand All @@ -166,7 +180,9 @@ impl Client {
subscription_id: String,
) -> common::Result<String> {
let server_node = self
.hstream_api_client
.channels
.channel()
.await
.lookup_subscription(LookupSubscriptionRequest { subscription_id })
.await?
.into_inner()
Expand Down
2 changes: 1 addition & 1 deletion src/hstreamdb/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl Client {

let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<(Payload, AckFn)>();

let _ = tokio::spawn(fetching(
_ = tokio::spawn(fetching(
consumer_name,
subscription_id,
request_sender,
Expand Down
19 changes: 5 additions & 14 deletions src/hstreamdb/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ use hstreamdb_pb::{
HStreamRecordHeader, ListShardsRequest, Shard,
};
use prost::Message;
use tokio::sync::mpsc::unbounded_channel;
use tokio::task::JoinHandle;
use tonic::transport::Channel;

use crate::channel_provider::{ChannelProvider, Channels};
use crate::channel_provider::Channels;
use crate::common::{self, PartitionKey, Record, ShardId};
use crate::utils::{self, clear_shard_buffer, lookup_shard, partition_key_to_shard_id};

Expand Down Expand Up @@ -65,30 +64,22 @@ impl BufferState {

impl Producer {
pub(crate) async fn new(
mut channel: HStreamApiClient<Channel>,
channels: Channels,
url_scheme: String,
request_receiver: tokio::sync::mpsc::UnboundedReceiver<Request>,
stream_name: String,
compression_type: CompressionType,
flush_settings: FlushSettings,
) -> common::Result<Self> {
let shards = channel
let shards = channels
.channel()
.await
.list_shards(ListShardsRequest {
stream_name: stream_name.clone(),
})
.await?
.into_inner()
.shards;
let (channel_provider_request_sender, channel_provider_request_receiver) =
unbounded_channel();
let channels =
ChannelProvider::new(&mut channel, &url_scheme, channel_provider_request_receiver)
.await?;
_ = tokio::spawn(async move {
let mut channels = channels;
channels.start().await
});
let channels = Channels::new(channel_provider_request_sender);
let producer = Producer {
tasks: Vec::new(),
shard_buffer: HashMap::new(),
Expand Down