Skip to content

Commit 2d0ee57

Browse files
authored
pb: exported pb types should use enum directly (#19)
* pb: pb: exported pb types should use enum directly * fix check
1 parent b717e7e commit 2d0ee57

File tree

3 files changed

+70
-26
lines changed

3 files changed

+70
-26
lines changed

src/hstreamdb/src/client.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
use common::Stream;
1+
use common::{Stream, Subscription};
22
use hstreamdb_pb::h_stream_api_client::HStreamApiClient;
33
use hstreamdb_pb::{
44
CompressionType, DeleteStreamRequest, DeleteSubscriptionRequest, ListStreamsRequest,
5-
ListSubscriptionsRequest, LookupSubscriptionRequest, NodeState, Subscription,
5+
ListSubscriptionsRequest, LookupSubscriptionRequest, NodeState,
66
};
77
use tonic::transport::Channel;
88
use tonic::Request;
@@ -98,6 +98,7 @@ impl Client {
9898

9999
impl Client {
100100
pub async fn create_subscription(&mut self, subscription: Subscription) -> common::Result<()> {
101+
let subscription: hstreamdb_pb::Subscription = subscription.into();
101102
self.hstream_api_client
102103
.create_subscription(subscription)
103104
.await?;
@@ -126,7 +127,10 @@ impl Client {
126127
.list_subscriptions(ListSubscriptionsRequest {})
127128
.await?
128129
.into_inner()
129-
.subscription;
130+
.subscription
131+
.into_iter()
132+
.map(|x| x.into())
133+
.collect();
130134
Ok(subscriptions)
131135
}
132136
}
@@ -183,10 +187,11 @@ impl Client {
183187
mod tests {
184188
use std::env;
185189

186-
use hstreamdb_pb::{SpecialOffset, Stream, Subscription};
190+
use hstreamdb_pb::{SpecialOffset, Stream};
187191
use hstreamdb_test_utils::rand_alphanumeric;
188192

189193
use super::Client;
194+
use crate::Subscription;
190195

191196
#[tokio::test(flavor = "multi_thread")]
192197
async fn test_stream_cld() {
@@ -249,7 +254,7 @@ mod tests {
249254
stream_name,
250255
ack_timeout_seconds: 60 * 10,
251256
max_unacked_records: 1000,
252-
offset: SpecialOffset::Earliest as i32,
257+
offset: SpecialOffset::Earliest,
253258
};
254259
for stream in streams.iter() {
255260
let subscription_ids = (0..5)

src/hstreamdb/src/common.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,43 @@
11
use std::io;
22

33
use hstreamdb_pb::StreamingFetchRequest;
4-
pub use hstreamdb_pb::{Stream, Subscription};
4+
pub use hstreamdb_pb::{SpecialOffset, Stream};
55
use num_bigint::ParseBigIntError;
66
use tonic::transport;
77

8+
pub struct Subscription {
9+
pub subscription_id: String,
10+
pub stream_name: String,
11+
pub ack_timeout_seconds: i32,
12+
pub max_unacked_records: i32,
13+
pub offset: SpecialOffset,
14+
}
15+
16+
impl From<Subscription> for hstreamdb_pb::Subscription {
17+
fn from(subscription: Subscription) -> Self {
18+
hstreamdb_pb::Subscription {
19+
subscription_id: subscription.subscription_id,
20+
stream_name: subscription.stream_name,
21+
ack_timeout_seconds: subscription.ack_timeout_seconds,
22+
max_unacked_records: subscription.max_unacked_records,
23+
offset: subscription.offset as _,
24+
}
25+
}
26+
}
27+
28+
impl From<hstreamdb_pb::Subscription> for Subscription {
29+
fn from(subscription: hstreamdb_pb::Subscription) -> Self {
30+
let offset = subscription.offset();
31+
Subscription {
32+
subscription_id: subscription.subscription_id,
33+
stream_name: subscription.stream_name,
34+
ack_timeout_seconds: subscription.ack_timeout_seconds,
35+
max_unacked_records: subscription.max_unacked_records,
36+
offset,
37+
}
38+
}
39+
}
40+
841
#[derive(Debug)]
942
pub enum Error {
1043
TransportError(transport::Error),

src/hstreamdb/src/producer.rs

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ impl BufferState {
5757
}
5858

5959
fn check(&self, flush_settings: &FlushSettings) -> bool {
60-
(flush_settings.len >= self.len) || (flush_settings.size >= self.size)
60+
(self.len >= flush_settings.len) || (self.size >= flush_settings.size)
6161
}
6262
}
6363

@@ -157,27 +157,33 @@ async fn flush(
157157
compression_type: CompressionType,
158158
buffer: Vec<Record>,
159159
) -> Result<(), String> {
160-
match lookup_shard(&mut channel, &url_scheme, shard_id).await {
161-
Err(err) => {
162-
log::warn!("{err}");
163-
Ok(())
164-
}
165-
Ok(server_node) => {
166-
let channel = HStreamApiClient::connect(server_node.clone())
160+
if !buffer.is_empty() {
161+
match lookup_shard(&mut channel, &url_scheme, shard_id).await {
162+
Err(err) => {
163+
log::warn!("{err}");
164+
Ok(())
165+
}
166+
Ok(server_node) => {
167+
let channel = HStreamApiClient::connect(server_node.clone())
168+
.await
169+
.map_err(|err| {
170+
format!("producer connect error: addr = {server_node}, {err}")
171+
})?;
172+
append(
173+
channel,
174+
stream_name,
175+
shard_id,
176+
compression_type,
177+
buffer.to_vec(),
178+
)
167179
.await
168-
.map_err(|err| format!("producer connect error: addr = {server_node}, {err}"))?;
169-
append(
170-
channel,
171-
stream_name,
172-
shard_id,
173-
compression_type,
174-
buffer.to_vec(),
175-
)
176-
.await
177-
.map_err(|err| format!("producer append error: addr = {server_node}, {err:?}"))
178-
.map(|x| log::debug!("append succeed: len = {}", x.len()))?;
179-
Ok(())
180+
.map_err(|err| format!("producer append error: addr = {server_node}, {err:?}"))
181+
.map(|x| log::debug!("append succeed: len = {}", x.len()))?;
182+
Ok(())
183+
}
180184
}
185+
} else {
186+
Ok(())
181187
}
182188
}
183189

0 commit comments

Comments
 (0)