Skip to content

Commit e66cedf

Browse files
authored
enable nats jetstream deduplication (#2)
1 parent e850536 commit e66cedf

File tree

4 files changed

+20
-4
lines changed

4 files changed

+20
-4
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "pyth-stream"
3-
version = "0.1.0"
3+
version = "0.1.1"
44
edition = "2021"
55

66
[lib]

src/bin/pyth_reader.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use anyhow::Result;
22
use async_nats::jetstream::{self};
3+
use async_nats::HeaderMap;
34
use clap::Parser;
45
use config::Config;
56
use pyth_sdk_solana::state::{load_price_account, PriceStatus, PythnetPriceAccount};
@@ -160,13 +161,26 @@ async fn fetch_price_updates(jetstream: jetstream::Context, config: &AppConfig)
160161

161162
let message = serde_json::to_string(&price_update)?;
162163

164+
// Create a unique message ID
165+
let message_id = format!(
166+
"{}:{}",
167+
price_update.price_feed.id, price_update.price_feed.price.publish_time
168+
);
169+
170+
// Create headers with the Nats-Msg-Id
171+
let mut headers = HeaderMap::new();
172+
headers.insert("Nats-Msg-Id", message_id.as_str());
173+
163174
let jetstream_clone = jetstream.clone();
164175
task::spawn(async move {
165176
match jetstream_clone
166-
.publish("pyth.price.updates", message.into())
177+
.publish_with_headers("pyth.price.updates", headers, message.into())
167178
.await
168179
{
169-
Ok(_) => debug!("Published price update to JetStream"),
180+
Ok(_) => debug!(
181+
"Published price update to JetStream with ID: {}",
182+
message_id
183+
),
170184
Err(e) => warn!("Failed to publish price update to JetStream: {}", e),
171185
}
172186
});

src/utils.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ pub async fn setup_jetstream(nats_client: &async_nats::Client) -> Result<jetstre
1414
duplicate_window: Duration::from_secs(60),
1515
discard: stream::DiscardPolicy::New,
1616
max_messages_per_subject: 100,
17+
allow_direct: true,
18+
allow_rollup: true,
1719
..Default::default()
1820
};
1921

0 commit comments

Comments
 (0)