Skip to content

Commit 77ac433

Browse files
committed
Changes in the actix subscriptions implementation
1 parent 464405e commit 77ac433

File tree

3 files changed

+172
-211
lines changed

3 files changed

+172
-211
lines changed

juniper_actix/examples/actix_subscriptions.rs

Lines changed: 47 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ use futures::Stream;
66
use juniper::{DefaultScalarValue, FieldError, RootNode};
77
use juniper_actix::{
88
graphiql_handler as gqli_handler, graphql_handler, playground_handler as play_handler,
9-
subscriptions::{graphql_subscriptions as sub_handler, EmptySubscriptionHandler},
9+
subscriptions::{
10+
graphql_subscriptions as sub_handler, GraphQLWSSession, SubscriptionState,
11+
SubscriptionStateHandler,
12+
},
1013
};
1114
use juniper_subscriptions::Coordinator;
1215
use std::collections::hash_map::Entry;
@@ -23,13 +26,15 @@ type MyCoordinator =
2326
struct ChatRoom {
2427
pub name: String,
2528
pub channel: (Sender<Msg>, Receiver<Msg>),
29+
pub msgs: Vec<Msg>,
2630
}
2731

2832
impl ChatRoom {
2933
pub fn new(name: String) -> Self {
3034
Self {
3135
name,
3236
channel: channel(16),
37+
msgs: Vec::new(),
3338
}
3439
}
3540
}
@@ -58,6 +63,14 @@ impl Query {
5863
.map(|(_, chat_room)| chat_room.name.clone())
5964
.collect()
6065
}
66+
67+
pub fn msgs_from_room(room_name: String, ctx: &Context) -> Option<Vec<Msg>> {
68+
ctx.chat_rooms
69+
.lock()
70+
.unwrap()
71+
.get(&room_name)
72+
.map(|chat_room| chat_room.msgs.clone())
73+
}
6174
}
6275

6376
struct Mutation;
@@ -68,20 +81,18 @@ impl Mutation {
6881
ctx.chat_rooms
6982
.lock()
7083
.unwrap()
71-
.get(&room_name)
84+
.get_mut(&room_name)
7285
.map(|chat_room| {
7386
let now = SystemTime::now()
7487
.duration_since(UNIX_EPOCH)
7588
.unwrap_or(Duration::new(0, 0));
76-
chat_room
77-
.channel
78-
.0
79-
.send(Msg {
80-
sender,
81-
value: msg,
82-
date: format!("{}", now.as_secs()),
83-
})
84-
.is_ok()
89+
let msg = Msg {
90+
sender,
91+
value: msg,
92+
date: format!("{}", now.as_secs()),
93+
};
94+
chat_room.msgs.push(msg.clone());
95+
chat_room.channel.0.send(msg).is_ok()
8596
})
8697
.is_some()
8798
}
@@ -96,7 +107,7 @@ struct Msg {
96107

97108
type StringStream = Pin<Box<dyn Stream<Item = Result<String, FieldError>> + Send>>;
98109

99-
type VecStringStream = Pin<Box<dyn Stream<Item = Result<Vec<Msg>, FieldError>> + Send>>;
110+
type MsgStream = Pin<Box<dyn Stream<Item = Result<Msg, FieldError>> + Send>>;
100111

101112
struct Subscription;
102113

@@ -116,19 +127,14 @@ impl Subscription {
116127
Box::pin(stream)
117128
}
118129

119-
async fn chat_room(room_name: String, ctx: &Context) -> VecStringStream {
120-
let mut messages: Vec<Msg> = Vec::new();
130+
async fn new_messages(room_name: String, ctx: &Context) -> MsgStream {
121131
let channel_rx = {
122132
match ctx.chat_rooms.lock().unwrap().entry(room_name.clone()) {
123133
Entry::Occupied(o) => o.get().channel.0.subscribe(),
124134
Entry::Vacant(v) => v.insert(ChatRoom::new(room_name)).channel.0.subscribe(),
125135
}
126136
};
127-
let stream = channel_rx.map(move |msg| {
128-
let msg = msg?;
129-
messages.push(msg);
130-
Ok(messages.clone())
131-
});
137+
let stream = channel_rx.map(|msg| Ok(msg?));
132138
Box::pin(stream)
133139
}
134140
}
@@ -138,7 +144,7 @@ fn schema() -> Schema {
138144
}
139145

140146
async fn graphiql_handler() -> Result<HttpResponse, Error> {
141-
gqli_handler("/", Some("/subscriptions")).await
147+
gqli_handler("/", Some("ws://localhost:8080/subscriptions")).await
142148
}
143149
async fn playground_handler() -> Result<HttpResponse, Error> {
144150
play_handler("/", Some("/subscriptions")).await
@@ -154,23 +160,34 @@ async fn graphql(
154160
graphql_handler(&schema, &context, req, payload).await
155161
}
156162

163+
#[derive(Default)]
164+
struct HandlerExample {}
165+
166+
impl<Context> SubscriptionStateHandler<Context> for HandlerExample
167+
where
168+
Context: Send + Sync,
169+
{
170+
fn handle(&self, state: SubscriptionState<Context>) -> Result<(), Box<dyn std::error::Error>> {
171+
match state {
172+
SubscriptionState::OnConnection(_, _) => println!("OnConnection"),
173+
SubscriptionState::OnOperation(_) => println!("OnOperation"),
174+
SubscriptionState::OnOperationComplete(_) => println!("OnOperationComplete"),
175+
SubscriptionState::OnDisconnect(_) => println!("OnDisconnect"),
176+
};
177+
Ok(())
178+
}
179+
}
180+
157181
async fn graphql_subscriptions(
158182
coordinator: web::Data<MyCoordinator>,
159183
stream: web::Payload,
160184
req: HttpRequest,
161185
chat_rooms: web::Data<Mutex<HashMap<String, ChatRoom>>>,
162186
) -> Result<HttpResponse, Error> {
163187
let context = Context::new(chat_rooms.into_inner());
164-
let handler: Option<EmptySubscriptionHandler> = None;
165-
sub_handler(
166-
coordinator,
167-
context,
168-
stream,
169-
req,
170-
handler,
171-
Some(Duration::from_secs(5)),
172-
)
173-
.await
188+
let actor = GraphQLWSSession::new(coordinator.into_inner(), context);
189+
let actor = actor.with_handler(HandlerExample::default());
190+
sub_handler(actor, stream, req).await
174191
}
175192

176193
#[actix_rt::main]

0 commit comments

Comments
 (0)