Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/hstreamdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ version = "0.1.0"
[dependencies]
workspace-hack = { version = "0.1", path = "../utils/workspace-hack" }

thiserror = "1.0.34"
log = "0.4.17"

flate2 = "1.0.24"
Expand Down
10 changes: 3 additions & 7 deletions src/hstreamdb/src/appender.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::common::{PartitionKey, Record};
use crate::common::Record;
use crate::producer::{self, Request};

#[derive(Clone)]
Expand All @@ -13,13 +13,9 @@ impl Appender {
}

impl Appender {
pub fn append(
&mut self,
partition_key: PartitionKey,
record: Record,
) -> Result<(), producer::SendError> {
pub fn append(&mut self, record: Record) -> Result<(), producer::SendError> {
self.request_sender
.send(Request(partition_key, record))
.send(Request(record))
.map_err(Into::into)
}
}
2 changes: 1 addition & 1 deletion src/hstreamdb/src/channel_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl ChannelProvider {
}
}
if channels.is_empty() {
Err(common::Error::NoAvailableChannel)
Err(common::Error::NoChannelAvailable)
} else {
Ok(ChannelProvider {
request_receiver,
Expand Down
18 changes: 14 additions & 4 deletions src/hstreamdb/src/common.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::io;

use hstreamdb_pb::StreamingFetchRequest;
pub use hstreamdb_pb::{SpecialOffset, Stream};
pub use hstreamdb_pb::{CompressionType, SpecialOffset, Stream};
use num_bigint::ParseBigIntError;
use tonic::transport;

Expand Down Expand Up @@ -38,21 +38,31 @@ impl From<hstreamdb_pb::Subscription> for Subscription {
}
}

#[derive(Debug)]
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
TransportError(transport::Error),
#[error(transparent)]
GrpcStatusError(tonic::Status),
#[error(transparent)]
CompressError(io::Error),
#[error(transparent)]
ParseUrlError(url::ParseError),
#[error(transparent)]
PartitionKeyError(PartitionKeyError),
#[error(transparent)]
StreamingFetchInitError(tokio::sync::mpsc::error::SendError<StreamingFetchRequest>),
#[error("failed to unwrap `{0}`")]
PBUnwrapError(String),
NoAvailableChannel,
#[error("No channel is available")]
NoChannelAvailable,
}

#[derive(Debug)]
#[derive(Debug, thiserror::Error)]
pub enum PartitionKeyError {
#[error(transparent)]
ParseBigIntError(ParseBigIntError),
#[error("No match for the partition key")]
NoMatch,
}

Expand Down
72 changes: 71 additions & 1 deletion src/hstreamdb/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,73 @@
//! Rust client library for [HStreamDB](https://hstream.io/)

//! ## Write Data to Streams
//! ```
//! use std::env;
//!
//! use hstreamdb::client::Client;
//! use hstreamdb::producer::FlushSettings;
//! use hstreamdb::{CompressionType, Payload, Record, Stream};
//! use rand::distributions::Alphanumeric;
//! use rand::{thread_rng, Rng};
//!
//! async fn produce_example() -> anyhow::Result<()> {
//! let mut client = Client::new(env::var("TEST_SERVER_ADDR")?).await?;
//!
//! let stream_name = "test_stream";
//!
//! client
//! .create_stream(Stream {
//! stream_name: "test_stream".to_string(),
//! replication_factor: 3,
//! backlog_duration: 7 * 24 * 3600,
//! shard_count: 12,
//! })
//! .await?;
//! println!("{:?}", client.list_streams().await?);
//!
//! // `Appender` is cheap to clone
//! let (appender, mut producer) = client
//! .new_producer(
//! stream_name.to_string(),
//! hstreamdb_pb::CompressionType::Zstd,
//! FlushSettings {
//! len: 10,
//! size: 4000 * 20,
//! },
//! )
//! .await?;
//!
//! _ = tokio::spawn(async move {
//! let mut appender = appender;
//!
//! for _ in 0..10 {
//! for _ in 0..100 {
//! let i: u32 = rand::random();
//! let payload: Vec<u8> = thread_rng()
//! .sample_iter(&Alphanumeric)
//! .take(20)
//! .map(char::from)
//! .collect::<String>()
//! .into_bytes();
//! appender
//! .append(Record {
//! partition_key: format!("test_partition_key_{i}"),
//! payload: Payload::RawRecord(payload),
//! })
//! .unwrap();
//! }
//! }
//! drop(appender)
//! });
//!
//! // when all `Appender`s for the corresponding `Producer` have been dropped,
//! // the `Producer` will wait for all requests to be done and then stop
//! producer.start().await;
//!
//! Ok(())
//! }
//! ```

#![feature(try_blocks)]
#![feature(box_syntax)]
#![feature(default_free_fn)]
Expand All @@ -10,4 +80,4 @@ pub mod consumer;
pub mod producer;
pub mod utils;

pub use common::{Error, Record, Result, Stream, Subscription};
pub use common::{CompressionType, Error, Payload, Record, Result, Stream, Subscription};
5 changes: 3 additions & 2 deletions src/hstreamdb/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::common::{self, PartitionKey, Record, ShardId};
use crate::utils::{self, clear_shard_buffer, lookup_shard, partition_key_to_shard_id};

#[derive(Debug)]
pub(crate) struct Request(pub(crate) PartitionKey, pub(crate) Record);
pub(crate) struct Request(pub(crate) Record);

pub struct Producer {
tasks: Vec<JoinHandle<()>>,
Expand Down Expand Up @@ -96,7 +96,8 @@ impl Producer {
}

pub async fn start(&mut self) {
while let Some(Request(partition_key, record)) = self.request_receiver.recv().await {
while let Some(Request(record)) = self.request_receiver.recv().await {
let partition_key = record.partition_key.clone();
match partition_key_to_shard_id(&self.shards, partition_key) {
Err(err) => {
log::error!("get shard id by partition key error: {:?}", err)
Expand Down
15 changes: 6 additions & 9 deletions src/hstreamdb/tests/consumer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,12 @@ async fn test_consumer() {
for _ in 0..10 {
for _ in 0..100 {
appender
.append(
"".to_string(),
Record {
partition_key: "".to_string(),
payload: hstreamdb::common::Payload::RawRecord(
rand_alphanumeric(20).as_bytes().to_vec(),
),
},
)
.append(Record {
partition_key: "".to_string(),
payload: hstreamdb::common::Payload::RawRecord(
rand_alphanumeric(20).as_bytes().to_vec(),
),
})
.unwrap();
}
}
Expand Down
15 changes: 6 additions & 9 deletions src/hstreamdb/tests/producer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,12 @@ async fn test_producer() {
let mut appender = appender;
for _ in 0..100 {
appender
.append(
"".to_string(),
Record {
partition_key: "".to_string(),
payload: hstreamdb::common::Payload::RawRecord(
rand_alphanumeric(20).as_bytes().to_vec(),
),
},
)
.append(Record {
partition_key: "".to_string(),
payload: hstreamdb::common::Payload::RawRecord(
rand_alphanumeric(20).as_bytes().to_vec(),
),
})
.unwrap();
}
drop(appender)
Expand Down