From 1be7e61d4c65939f9e98d93d2afb29618ff4d0b1 Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Thu, 11 Apr 2024 18:09:08 +0900 Subject: [PATCH 1/7] add allow_unordered query param --- hermes/src/api/sse.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/hermes/src/api/sse.rs b/hermes/src/api/sse.rs index 1b690836ad..b9b4521c7d 100644 --- a/hermes/src/api/sse.rs +++ b/hermes/src/api/sse.rs @@ -63,6 +63,10 @@ pub struct StreamPriceUpdatesQueryParams { /// If true, include the parsed price update in the `parsed` field of each returned feed. #[serde(default = "default_true")] parsed: bool, + + /// If true, allows unordered price updates to be included in the stream. + #[serde(default)] + allow_unordered: bool, } fn default_true() -> bool { @@ -99,6 +103,12 @@ pub async fn price_stream_sse_handler( async move { match message { Ok(event) => { + // Check if the event is out-of-order and if unordered updates are not allowed + if let AggregationEvent::OutOfOrder { .. } = event { + if !params.allow_unordered { + return Ok(Event::default().comment("Out-of-order event skipped")); + } + } match handle_aggregation_event( event, state_clone, From a4a9e507f6b578b03fd24ef41ab01d1b2fb5b471 Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Thu, 11 Apr 2024 22:53:30 +0900 Subject: [PATCH 2/7] add benchmarks_only query params --- hermes/src/api/sse.rs | 68 +++++++++++++++++++++++++++++++++---------- 1 file changed, 53 insertions(+), 15 deletions(-) diff --git a/hermes/src/api/sse.rs b/hermes/src/api/sse.rs index b9b4521c7d..22fe6a0d39 100644 --- a/hermes/src/api/sse.rs +++ b/hermes/src/api/sse.rs @@ -15,6 +15,7 @@ use { ParsedPriceUpdate, PriceIdInput, PriceUpdate, + RpcPriceIdentifier, }, ApiState, }, @@ -67,6 +68,10 @@ pub struct StreamPriceUpdatesQueryParams { /// If true, allows unordered price updates to be included in the stream. #[serde(default)] allow_unordered: bool, + + /// If true, only include benchmark prices that are the initial price updates at a given timestamp (i.e., prevPubTime != pubTime). + #[serde(default)] + benchmarks_only: bool, } fn default_true() -> bool { @@ -115,10 +120,18 @@ pub async fn price_stream_sse_handler( price_ids_clone, params.encoding, params.parsed, + params.benchmarks_only, ) .await { - Ok(price_update) => Ok(Event::default().json_data(price_update).unwrap()), + Ok(price_update) => { + // Check if there is any data to send + if price_update.binary.data.is_empty() { + // No data to send, skip creating an event + return Ok(Event::default().comment("No data to send")); + } + Ok(Event::default().json_data(price_update).unwrap()) + } Err(e) => Ok(error_event(e)), } } @@ -136,18 +149,51 @@ async fn handle_aggregation_event( mut price_ids: Vec, encoding: EncodingType, parsed: bool, + benchmarks_only: bool, ) -> Result { // We check for available price feed ids to ensure that the price feed ids provided exists since price feeds can be removed. let available_price_feed_ids = crate::aggregate::get_price_feed_ids(&*state.state).await; price_ids.retain(|price_feed_id| available_price_feed_ids.contains(price_feed_id)); - let price_feeds_with_update_data = crate::aggregate::get_price_feeds_with_update_data( + let mut price_feeds_with_update_data = crate::aggregate::get_price_feeds_with_update_data( &*state.state, &price_ids, RequestTime::AtSlot(event.slot()), ) .await?; + + let mut parsed_price_updates: Vec = price_feeds_with_update_data + .price_feeds + .into_iter() + .map(|price_feed| price_feed.into()) + .collect(); + + + if benchmarks_only { + // Remove those with metadata.prev_publish_time != price.publish_time from parsed_price_updates + parsed_price_updates.retain(|price_feed| { + price_feed + .metadata + .prev_publish_time + .map_or(false, |prev_time| { + prev_time != price_feed.price.publish_time + }) + }); + // Retain price id in price_ids that are in parsed_price_updates + price_ids.retain(|price_id| { + parsed_price_updates + .iter() + .any(|price_feed| price_feed.id == RpcPriceIdentifier::from(*price_id)) + }); + price_feeds_with_update_data = crate::aggregate::get_price_feeds_with_update_data( + &*state.state, + &price_ids, + RequestTime::AtSlot(event.slot()), + ) + .await?; + } + let price_update_data = price_feeds_with_update_data.update_data; let encoded_data: Vec = price_update_data .into_iter() @@ -157,22 +203,14 @@ async fn handle_aggregation_event( encoding, data: encoded_data, }; - let parsed_price_updates: Option> = if parsed { - Some( - price_feeds_with_update_data - .price_feeds - .into_iter() - .map(|price_feed| price_feed.into()) - .collect(), - ) - } else { - None - }; - Ok(PriceUpdate { binary: binary_price_update, - parsed: parsed_price_updates, + parsed: if parsed { + Some(parsed_price_updates) + } else { + None + }, }) } From 18885278a46274890bfdbb1c47fbb42ddeb9199a Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Thu, 11 Apr 2024 22:59:13 +0900 Subject: [PATCH 3/7] update docs --- hermes/src/api.rs | 1 + hermes/src/api/rest/index.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/hermes/src/api.rs b/hermes/src/api.rs index 107c65fb21..59a37c75b0 100644 --- a/hermes/src/api.rs +++ b/hermes/src/api.rs @@ -106,6 +106,7 @@ pub async fn run(opts: RunOptions, state: ApiState) -> Result<()> { rest::latest_price_updates, rest::timestamp_price_updates, rest::price_feeds_metadata, + sse::price_stream_sse_handler, ), components( schemas( diff --git a/hermes/src/api/rest/index.rs b/hermes/src/api/rest/index.rs index f9dcdf1bc7..99611f821e 100644 --- a/hermes/src/api/rest/index.rs +++ b/hermes/src/api/rest/index.rs @@ -17,6 +17,7 @@ pub async fn index() -> impl IntoResponse { "/api/get_vaa?id=&publish_time=", "/api/get_vaa_ccip?data=<0x+>", "/v2/updates/price/latest?ids[]=&ids[]=&..(&encoding=hex|base64)(&parsed=false)", + "/v2/updates/price/stream?ids[]=&ids[]=&..(&encoding=hex|base64)(&parsed=false)(&allow_unordered=false)(&benchmarks_only=false)", "/v2/updates/price/?ids[]=&ids[]=&..(&encoding=hex|base64)(&parsed=false)", "/v2/price_feeds?(query=btc)(&asset_type=crypto|equity|fx|metal|rates)", ]) From 8e8734af197655f21e0fa2dab0e3cb9b18dcc0fd Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Mon, 15 Apr 2024 16:57:10 +0900 Subject: [PATCH 4/7] bump --- hermes/Cargo.lock | 2 +- hermes/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hermes/Cargo.lock b/hermes/Cargo.lock index c3332c8b6c..23eeeaf642 100644 --- a/hermes/Cargo.lock +++ b/hermes/Cargo.lock @@ -1796,7 +1796,7 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "hermes" -version = "0.5.4" +version = "0.5.5" dependencies = [ "anyhow", "async-trait", diff --git a/hermes/Cargo.toml b/hermes/Cargo.toml index a62616921d..a67dabbfb2 100644 --- a/hermes/Cargo.toml +++ b/hermes/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hermes" -version = "0.5.4" +version = "0.5.5" description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle." edition = "2021" From c6d37c5b0a4890f7d805e68d39f97ec93eb8dcb4 Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Mon, 15 Apr 2024 17:41:05 +0900 Subject: [PATCH 5/7] address comments --- hermes/src/api.rs | 71 +++++++++---------- hermes/src/api/rest.rs | 1 + .../src/api/rest/v2/latest_price_updates.rs | 4 +- hermes/src/api/rest/v2/mod.rs | 1 + hermes/src/api/{ => rest/v2}/sse.rs | 40 +++++++---- .../api/rest/v2/timestamp_price_updates.rs | 4 +- 6 files changed, 66 insertions(+), 55 deletions(-) rename hermes/src/api/{ => rest/v2}/sse.rs (85%) diff --git a/hermes/src/api.rs b/hermes/src/api.rs index 59a37c75b0..1dc4c70a98 100644 --- a/hermes/src/api.rs +++ b/hermes/src/api.rs @@ -23,7 +23,6 @@ use { mod doc_examples; mod metrics_middleware; mod rest; -mod sse; pub mod types; mod ws; @@ -96,40 +95,40 @@ pub async fn run(opts: RunOptions, state: ApiState) -> Result<()> { #[derive(OpenApi)] #[openapi( - paths( - rest::get_price_feed, - rest::get_vaa, - rest::get_vaa_ccip, - rest::latest_price_feeds, - rest::latest_vaas, - rest::price_feed_ids, - rest::latest_price_updates, - rest::timestamp_price_updates, - rest::price_feeds_metadata, - sse::price_stream_sse_handler, - ), - components( - schemas( - rest::GetVaaCcipInput, - rest::GetVaaCcipResponse, - rest::GetVaaResponse, - types::PriceIdInput, - types::RpcPrice, - types::RpcPriceFeed, - types::RpcPriceFeedMetadata, - types::RpcPriceIdentifier, - types::EncodingType, - types::PriceUpdate, - types::BinaryPriceUpdate, - types::ParsedPriceUpdate, - types::RpcPriceFeedMetadataV2, - types::PriceFeedMetadata, - types::AssetType - ) - ), - tags( - (name = "hermes", description = "Pyth Real-Time Pricing API") - ) + paths( + rest::get_price_feed, + rest::get_vaa, + rest::get_vaa_ccip, + rest::latest_price_feeds, + rest::latest_vaas, + rest::price_feed_ids, + rest::latest_price_updates, + rest::timestamp_price_updates, + rest::price_feeds_metadata, + rest::price_stream_sse_handler, + ), + components( + schemas( + rest::GetVaaCcipInput, + rest::GetVaaCcipResponse, + rest::GetVaaResponse, + types::PriceIdInput, + types::RpcPrice, + types::RpcPriceFeed, + types::RpcPriceFeedMetadata, + types::RpcPriceIdentifier, + types::EncodingType, + types::PriceUpdate, + types::BinaryPriceUpdate, + types::ParsedPriceUpdate, + types::RpcPriceFeedMetadataV2, + types::PriceFeedMetadata, + types::AssetType + ) + ), + tags( + (name = "hermes", description = "Pyth Real-Time Pricing API") + ) )] struct ApiDoc; @@ -147,7 +146,7 @@ pub async fn run(opts: RunOptions, state: ApiState) -> Result<()> { .route("/api/price_feed_ids", get(rest::price_feed_ids)) .route( "/v2/updates/price/stream", - get(sse::price_stream_sse_handler), + get(rest::price_stream_sse_handler), ) .route("/v2/updates/price/latest", get(rest::latest_price_updates)) .route( diff --git a/hermes/src/api/rest.rs b/hermes/src/api/rest.rs index 7cee21a9f8..9d29cc9448 100644 --- a/hermes/src/api/rest.rs +++ b/hermes/src/api/rest.rs @@ -35,6 +35,7 @@ pub use { v2::{ latest_price_updates::*, price_feeds_metadata::*, + sse::*, timestamp_price_updates::*, }, }; diff --git a/hermes/src/api/rest/v2/latest_price_updates.rs b/hermes/src/api/rest/v2/latest_price_updates.rs index c5a4b3d5b3..df2f40116d 100644 --- a/hermes/src/api/rest/v2/latest_price_updates.rs +++ b/hermes/src/api/rest/v2/latest_price_updates.rs @@ -46,11 +46,11 @@ pub struct LatestPriceUpdatesQueryParams { #[param(example = "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43")] ids: Vec, - /// If true, include the parsed price update in the `parsed` field of each returned feed. + /// If true, include the parsed price update in the `parsed` field of each returned feed. Default is `hex`. #[serde(default)] encoding: EncodingType, - /// If true, include the parsed price update in the `parsed` field of each returned feed. + /// If true, include the parsed price update in the `parsed` field of each returned feed. Default is `true`. #[serde(default = "default_true")] parsed: bool, } diff --git a/hermes/src/api/rest/v2/mod.rs b/hermes/src/api/rest/v2/mod.rs index b8102741fa..a02ddbc482 100644 --- a/hermes/src/api/rest/v2/mod.rs +++ b/hermes/src/api/rest/v2/mod.rs @@ -1,3 +1,4 @@ pub mod latest_price_updates; pub mod price_feeds_metadata; +pub mod sse; pub mod timestamp_price_updates; diff --git a/hermes/src/api/sse.rs b/hermes/src/api/rest/v2/sse.rs similarity index 85% rename from hermes/src/api/sse.rs rename to hermes/src/api/rest/v2/sse.rs index 22fe6a0d39..f7167bc2d5 100644 --- a/hermes/src/api/sse.rs +++ b/hermes/src/api/rest/v2/sse.rs @@ -57,11 +57,11 @@ pub struct StreamPriceUpdatesQueryParams { #[param(example = "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43")] ids: Vec, - /// If true, include the parsed price update in the `parsed` field of each returned feed. + /// If true, include the parsed price update in the `parsed` field of each returned feed. Default is `hex`. #[serde(default)] encoding: EncodingType, - /// If true, include the parsed price update in the `parsed` field of each returned feed. + /// If true, include the parsed price update in the `parsed` field of each returned feed. Default is `true`. #[serde(default = "default_true")] parsed: bool, @@ -108,12 +108,6 @@ pub async fn price_stream_sse_handler( async move { match message { Ok(event) => { - // Check if the event is out-of-order and if unordered updates are not allowed - if let AggregationEvent::OutOfOrder { .. } = event { - if !params.allow_unordered { - return Ok(Event::default().comment("Out-of-order event skipped")); - } - } match handle_aggregation_event( event, state_clone, @@ -121,16 +115,24 @@ pub async fn price_stream_sse_handler( params.encoding, params.parsed, params.benchmarks_only, + params.allow_unordered, ) .await { Ok(price_update) => { // Check if there is any data to send - if price_update.binary.data.is_empty() { - // No data to send, skip creating an event - return Ok(Event::default().comment("No data to send")); + if let Some(update) = price_update { + if update.binary.data.is_empty() { + // No data to send, skip creating an event + return Ok(Event::default().comment("No data to send")); + } + Ok(Event::default() + .json_data(update) + .unwrap_or_else(|e| error_event(e))) + } else { + // No update available, possibly return a different event or handle accordingly + Ok(Event::default().comment("No update available")) } - Ok(Event::default().json_data(price_update).unwrap()) } Err(e) => Ok(error_event(e)), } @@ -150,7 +152,15 @@ async fn handle_aggregation_event( encoding: EncodingType, parsed: bool, benchmarks_only: bool, -) -> Result { + allow_unordered: bool, +) -> Result> { + // Handle out-of-order events + if let AggregationEvent::OutOfOrder { .. } = event { + if !allow_unordered { + return Ok(None); + } + } + // We check for available price feed ids to ensure that the price feed ids provided exists since price feeds can be removed. let available_price_feed_ids = crate::aggregate::get_price_feed_ids(&*state.state).await; @@ -204,14 +214,14 @@ async fn handle_aggregation_event( data: encoded_data, }; - Ok(PriceUpdate { + Ok(Some(PriceUpdate { binary: binary_price_update, parsed: if parsed { Some(parsed_price_updates) } else { None }, - }) + })) } fn error_event(e: E) -> Event { diff --git a/hermes/src/api/rest/v2/timestamp_price_updates.rs b/hermes/src/api/rest/v2/timestamp_price_updates.rs index 64ebc1cc34..f47d2e7180 100644 --- a/hermes/src/api/rest/v2/timestamp_price_updates.rs +++ b/hermes/src/api/rest/v2/timestamp_price_updates.rs @@ -58,11 +58,11 @@ pub struct TimestampPriceUpdatesQueryParams { #[param(example = "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43")] ids: Vec, - /// If true, include the parsed price update in the `parsed` field of each returned feed. + /// If true, include the parsed price update in the `parsed` field of each returned feed. Default is `hex`. #[serde(default)] encoding: EncodingType, - /// If true, include the parsed price update in the `parsed` field of each returned feed. + /// If true, include the parsed price update in the `parsed` field of each returned feed. Default is `true`. #[serde(default = "default_true")] parsed: bool, } From 646b7f696f9dd78fc7dc6333ac1777fe773ba353 Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Mon, 15 Apr 2024 18:27:14 +0900 Subject: [PATCH 6/7] address comments --- hermes/src/api/rest/v2/sse.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/hermes/src/api/rest/v2/sse.rs b/hermes/src/api/rest/v2/sse.rs index f7167bc2d5..71ed2a76f6 100644 --- a/hermes/src/api/rest/v2/sse.rs +++ b/hermes/src/api/rest/v2/sse.rs @@ -204,6 +204,11 @@ async fn handle_aggregation_event( .await?; } + // Check if price_ids is empty after filtering and return None if it is + if price_ids.is_empty() { + return Ok(None); + } + let price_update_data = price_feeds_with_update_data.update_data; let encoded_data: Vec = price_update_data .into_iter() From 0afce369f57dcfab8c0b143a3bd9c4e03c0faa88 Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Mon, 15 Apr 2024 18:32:12 +0900 Subject: [PATCH 7/7] address comments --- hermes/src/api/rest/v2/sse.rs | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/hermes/src/api/rest/v2/sse.rs b/hermes/src/api/rest/v2/sse.rs index 71ed2a76f6..b5094b3098 100644 --- a/hermes/src/api/rest/v2/sse.rs +++ b/hermes/src/api/rest/v2/sse.rs @@ -119,21 +119,10 @@ pub async fn price_stream_sse_handler( ) .await { - Ok(price_update) => { - // Check if there is any data to send - if let Some(update) = price_update { - if update.binary.data.is_empty() { - // No data to send, skip creating an event - return Ok(Event::default().comment("No data to send")); - } - Ok(Event::default() - .json_data(update) - .unwrap_or_else(|e| error_event(e))) - } else { - // No update available, possibly return a different event or handle accordingly - Ok(Event::default().comment("No update available")) - } - } + Ok(Some(update)) => Ok(Event::default() + .json_data(update) + .unwrap_or_else(|e| error_event(e))), + Ok(None) => Ok(Event::default().comment("No update available")), Err(e) => Ok(error_event(e)), } }