@@ -58,42 +58,41 @@ pub async fn create_price_update_stream(
5858
5959 let stream = response. bytes_stream ( ) ;
6060
61- let sse_stream =
62- eventsource_stream:: EventStream :: new ( stream)
63- . map ( move |event_result| match event_result {
64- Ok ( event) => {
65- if event. event != "message" {
66- return Err ( format ! ( "Unexpected event type: {}" , event. event) . into ( ) ) ;
67- }
61+ let sse_stream = eventsource_stream:: EventStream :: new ( stream)
62+ . map ( move |event_result| match event_result {
63+ Ok ( event) => {
64+ if event. event != "message" {
65+ return Err ( format ! ( "Unexpected event type: {}" , event. event) . into ( ) ) ;
66+ }
6867
69- let data = & event. data ;
68+ let data = & event. data ;
7069
71- println ! ( "Received SSE data: {}" , data) ;
70+ println ! ( "Received SSE data: {}" , data) ;
7271
73- match serde_json:: from_str :: < crate :: models:: SseEvent > ( data) {
74- Ok ( sse_event) => {
75- if let Some ( parsed_updates) = sse_event. parsed {
76- let stream =
72+ match serde_json:: from_str :: < crate :: models:: SseEvent > ( data) {
73+ Ok ( sse_event) => {
74+ if let Some ( parsed_updates) = sse_event. parsed {
75+ let stream =
7776 parsed_updates
7877 . into_iter ( )
7978 . map ( Ok )
8079 . collect :: < Vec <
8180 Result < ParsedPriceUpdate , Box < dyn Error + Send + Sync > > ,
8281 > > ( ) ;
83- Ok ( futures_util:: stream:: iter ( stream) )
84- } else {
85- Err ( "No parsed price updates in the response" . into ( ) )
86- }
82+ Ok ( futures_util:: stream:: iter ( stream) )
83+ } else {
84+ Err ( "No parsed price updates in the response" . into ( ) )
8785 }
88- Err ( e) => Err ( format ! ( "Failed to parse price update: {}" , e) . into ( ) ) ,
8986 }
87+ Err ( e) => Err ( format ! ( "Failed to parse price update: {}" , e) . into ( ) ) ,
9088 }
91- Err ( e) => Err ( format ! ( "Error in SSE stream: {}" , e) . into ( ) ) ,
92- } )
93- . flat_map ( |result| match result {
94- Ok ( stream) => stream,
95- Err ( e) => futures_util:: stream:: iter ( vec ! [ Err ( e) ] ) ,
96- } ) ;
89+ }
90+ Err ( e) => Err ( format ! ( "Error in SSE stream: {}" , e) . into ( ) ) ,
91+ } )
92+ . flat_map ( |result| match result {
93+ Ok ( stream) => stream,
94+ Err ( e) => futures_util:: stream:: iter ( vec ! [ Err ( e) ] ) ,
95+ } ) ;
9796
9897 Ok ( sse_stream)
9998}
0 commit comments