@@ -6,7 +6,10 @@ use futures::Stream;
6
6
use juniper:: { DefaultScalarValue , FieldError , RootNode } ;
7
7
use juniper_actix:: {
8
8
graphiql_handler as gqli_handler, graphql_handler, playground_handler as play_handler,
9
- subscriptions:: { graphql_subscriptions as sub_handler, EmptySubscriptionHandler } ,
9
+ subscriptions:: {
10
+ graphql_subscriptions as sub_handler, GraphQLWSSession , SubscriptionState ,
11
+ SubscriptionStateHandler ,
12
+ } ,
10
13
} ;
11
14
use juniper_subscriptions:: Coordinator ;
12
15
use std:: collections:: hash_map:: Entry ;
@@ -23,13 +26,15 @@ type MyCoordinator =
23
26
struct ChatRoom {
24
27
pub name : String ,
25
28
pub channel : ( Sender < Msg > , Receiver < Msg > ) ,
29
+ pub msgs : Vec < Msg > ,
26
30
}
27
31
28
32
impl ChatRoom {
29
33
pub fn new ( name : String ) -> Self {
30
34
Self {
31
35
name,
32
36
channel : channel ( 16 ) ,
37
+ msgs : Vec :: new ( ) ,
33
38
}
34
39
}
35
40
}
@@ -58,6 +63,14 @@ impl Query {
58
63
. map ( |( _, chat_room) | chat_room. name . clone ( ) )
59
64
. collect ( )
60
65
}
66
+
67
+ pub fn msgs_from_room ( room_name : String , ctx : & Context ) -> Option < Vec < Msg > > {
68
+ ctx. chat_rooms
69
+ . lock ( )
70
+ . unwrap ( )
71
+ . get ( & room_name)
72
+ . map ( |chat_room| chat_room. msgs . clone ( ) )
73
+ }
61
74
}
62
75
63
76
struct Mutation ;
@@ -68,20 +81,18 @@ impl Mutation {
68
81
ctx. chat_rooms
69
82
. lock ( )
70
83
. unwrap ( )
71
- . get ( & room_name)
84
+ . get_mut ( & room_name)
72
85
. map ( |chat_room| {
73
86
let now = SystemTime :: now ( )
74
87
. duration_since ( UNIX_EPOCH )
75
88
. unwrap_or ( Duration :: new ( 0 , 0 ) ) ;
76
- chat_room
77
- . channel
78
- . 0
79
- . send ( Msg {
80
- sender,
81
- value : msg,
82
- date : format ! ( "{}" , now. as_secs( ) ) ,
83
- } )
84
- . is_ok ( )
89
+ let msg = Msg {
90
+ sender,
91
+ value : msg,
92
+ date : format ! ( "{}" , now. as_secs( ) ) ,
93
+ } ;
94
+ chat_room. msgs . push ( msg. clone ( ) ) ;
95
+ chat_room. channel . 0 . send ( msg) . is_ok ( )
85
96
} )
86
97
. is_some ( )
87
98
}
@@ -96,7 +107,7 @@ struct Msg {
96
107
97
108
type StringStream = Pin < Box < dyn Stream < Item = Result < String , FieldError > > + Send > > ;
98
109
99
- type VecStringStream = Pin < Box < dyn Stream < Item = Result < Vec < Msg > , FieldError > > + Send > > ;
110
+ type MsgStream = Pin < Box < dyn Stream < Item = Result < Msg , FieldError > > + Send > > ;
100
111
101
112
struct Subscription ;
102
113
@@ -116,19 +127,14 @@ impl Subscription {
116
127
Box :: pin ( stream)
117
128
}
118
129
119
- async fn chat_room ( room_name : String , ctx : & Context ) -> VecStringStream {
120
- let mut messages: Vec < Msg > = Vec :: new ( ) ;
130
+ async fn new_messages ( room_name : String , ctx : & Context ) -> MsgStream {
121
131
let channel_rx = {
122
132
match ctx. chat_rooms . lock ( ) . unwrap ( ) . entry ( room_name. clone ( ) ) {
123
133
Entry :: Occupied ( o) => o. get ( ) . channel . 0 . subscribe ( ) ,
124
134
Entry :: Vacant ( v) => v. insert ( ChatRoom :: new ( room_name) ) . channel . 0 . subscribe ( ) ,
125
135
}
126
136
} ;
127
- let stream = channel_rx. map ( move |msg| {
128
- let msg = msg?;
129
- messages. push ( msg) ;
130
- Ok ( messages. clone ( ) )
131
- } ) ;
137
+ let stream = channel_rx. map ( |msg| Ok ( msg?) ) ;
132
138
Box :: pin ( stream)
133
139
}
134
140
}
@@ -138,7 +144,7 @@ fn schema() -> Schema {
138
144
}
139
145
140
146
async fn graphiql_handler ( ) -> Result < HttpResponse , Error > {
141
- gqli_handler ( "/" , Some ( "/subscriptions" ) ) . await
147
+ gqli_handler ( "/" , Some ( "ws://localhost:8080 /subscriptions" ) ) . await
142
148
}
143
149
async fn playground_handler ( ) -> Result < HttpResponse , Error > {
144
150
play_handler ( "/" , Some ( "/subscriptions" ) ) . await
@@ -154,23 +160,34 @@ async fn graphql(
154
160
graphql_handler ( & schema, & context, req, payload) . await
155
161
}
156
162
163
+ #[ derive( Default ) ]
164
+ struct HandlerExample { }
165
+
166
+ impl < Context > SubscriptionStateHandler < Context > for HandlerExample
167
+ where
168
+ Context : Send + Sync ,
169
+ {
170
+ fn handle ( & self , state : SubscriptionState < Context > ) -> Result < ( ) , Box < dyn std:: error:: Error > > {
171
+ match state {
172
+ SubscriptionState :: OnConnection ( _, _) => println ! ( "OnConnection" ) ,
173
+ SubscriptionState :: OnOperation ( _) => println ! ( "OnOperation" ) ,
174
+ SubscriptionState :: OnOperationComplete ( _) => println ! ( "OnOperationComplete" ) ,
175
+ SubscriptionState :: OnDisconnect ( _) => println ! ( "OnDisconnect" ) ,
176
+ } ;
177
+ Ok ( ( ) )
178
+ }
179
+ }
180
+
157
181
async fn graphql_subscriptions (
158
182
coordinator : web:: Data < MyCoordinator > ,
159
183
stream : web:: Payload ,
160
184
req : HttpRequest ,
161
185
chat_rooms : web:: Data < Mutex < HashMap < String , ChatRoom > > > ,
162
186
) -> Result < HttpResponse , Error > {
163
187
let context = Context :: new ( chat_rooms. into_inner ( ) ) ;
164
- let handler: Option < EmptySubscriptionHandler > = None ;
165
- sub_handler (
166
- coordinator,
167
- context,
168
- stream,
169
- req,
170
- handler,
171
- Some ( Duration :: from_secs ( 5 ) ) ,
172
- )
173
- . await
188
+ let actor = GraphQLWSSession :: new ( coordinator. into_inner ( ) , context) ;
189
+ let actor = actor. with_handler ( HandlerExample :: default ( ) ) ;
190
+ sub_handler ( actor, stream, req) . await
174
191
}
175
192
176
193
#[ actix_rt:: main]
0 commit comments