Skip to content

Commit 72741a2

Browse files
authored
doc: add consume example (#29)
* doc: add consume example * chore: add version
1 parent a4cb4da commit 72741a2

File tree

4 files changed

+66
-21
lines changed

4 files changed

+66
-21
lines changed

Cargo.lock

Lines changed: 10 additions & 18 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/hstreamdb/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ num-bigint = "0.4.3"
1616
num-traits = "0.2.15"
1717
md-5 = "0.10.1"
1818

19-
hstreamdb-pb = { path = "../hstreamdb-pb/" }
19+
hstreamdb-pb = { version = "0.1.0", path = "../hstreamdb-pb/" }
2020
prost-types = "0.11.1"
2121

2222
prost = "0.11.0"

src/hstreamdb/src/common.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ pub use hstreamdb_pb::{CompressionType, SpecialOffset, Stream};
55
use num_bigint::ParseBigIntError;
66
use tonic::transport;
77

8+
#[derive(Debug)]
89
pub struct Subscription {
910
pub subscription_id: String,
1011
pub stream_name: String,

src/hstreamdb/src/lib.rs

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! Rust client library for [HStreamDB](https://hstream.io/)
22
33
//! ## Write Data to Streams
4+
//!
45
//! ```
56
//! use std::env;
67
//!
@@ -67,8 +68,57 @@
6768
//! Ok(())
6869
//! }
6970
//! ```
71+
//!
72+
//! ## Read Data from Subscriptions
73+
//!
74+
//! ```
75+
//! use std::env;
76+
//!
77+
//! use hstreamdb::client::Client;
78+
//! use hstreamdb::{SpecialOffset, Subscription};
79+
//! use tokio_stream::StreamExt;
80+
//!
81+
//! async fn consume_example() -> anyhow::Result<()> {
82+
//! let addr = env::var("TEST_SERVER_ADDR").unwrap();
83+
//! let mut client = Client::new(addr).await.unwrap();
84+
//!
85+
//! let stream_name = "test_stream";
86+
//! let subscription_id = "test_subscription";
87+
//!
88+
//! client
89+
//! .create_subscription(Subscription {
90+
//! subscription_id: subscription_id.to_string(),
91+
//! stream_name: stream_name.to_string(),
92+
//! ack_timeout_seconds: 60 * 60,
93+
//! max_unacked_records: 1000,
94+
//! offset: SpecialOffset::Earliest,
95+
//! })
96+
//! .await?;
97+
//! println!("{:?}", client.list_subscriptions().await?);
98+
//!
99+
//! let mut stream = client
100+
//! .streaming_fetch("test_consumer".to_string(), subscription_id.to_string())
101+
//! .await
102+
//! .unwrap();
103+
//! let mut records = Vec::new();
104+
//! while let Some((record, ack)) = stream.next().await {
105+
//! println!("{record:?}");
106+
//! records.push(record);
107+
//! ack().unwrap();
108+
//! if records.len() == 10 * 100 {
109+
//! println!("done");
110+
//! break;
111+
//! }
112+
//! }
113+
//!
114+
//! client
115+
//! .delete_subscription(subscription_id.to_string(), true)
116+
//! .await?;
117+
//!
118+
//! Ok(())
119+
//! }
120+
//! ```
70121
71-
#![feature(try_blocks)]
72122
#![feature(box_syntax)]
73123
#![feature(default_free_fn)]
74124

@@ -80,4 +130,6 @@ pub mod consumer;
80130
pub mod producer;
81131
pub mod utils;
82132

83-
pub use common::{CompressionType, Error, Payload, Record, Result, Stream, Subscription};
133+
pub use common::{
134+
CompressionType, Error, Payload, Record, Result, SpecialOffset, Stream, Subscription,
135+
};

0 commit comments

Comments
 (0)