Skip to content

Commit eeda6ff

Browse files
committed
Initial implementation for the juniper_actix subscriptions handler
1 parent f5b3fb4 commit eeda6ff

File tree

3 files changed

+670
-0
lines changed

3 files changed

+670
-0
lines changed

juniper_actix/Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ documentation = "https://docs.rs/juniper_actix"
88
repository = "https://github.com/graphql-rust/juniper"
99
edition = "2018"
1010

11+
[features]
12+
subscriptions = ["juniper_subscriptions"]
1113

1214
[dependencies]
1315
actix = "0.9.0"
@@ -16,6 +18,7 @@ actix-web = { version = "2.0.0", features = ["rustls"] }
1618
actix-web-actors = "2.0.0"
1719
futures = { version = "0.3.1", features = ["compat"] }
1820
juniper = { version = "0.14.2", path = "../juniper", default-features = false }
21+
juniper_subscriptions = { path = "../juniper_subscriptions", optional = true, features = ["unstable-ws-subscriptions-transport"]}
1922
tokio = { version = "0.2", features = ["time"] }
2023
serde_json = "1.0.24"
2124
serde_derive = "1.0.75"
@@ -31,3 +34,7 @@ tokio = { version = "0.2", features = ["rt-core", "macros", "blocking"] }
3134
actix-cors = "0.2.0"
3235
actix-identity = "0.2.0"
3336
bytes = "0.5.4"
37+
38+
[[example]]
39+
name="actix_subscriptions"
40+
required-features=["subscriptions"]
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
#![deny(warnings)]
2+
3+
use actix_cors::Cors;
4+
use actix_web::{middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer};
5+
use futures::Stream;
6+
use juniper::{DefaultScalarValue, EmptyMutation, FieldError, RootNode};
7+
use juniper_actix::{
8+
graphiql_handler as gqli_handler, graphql_handler, playground_handler as play_handler,
9+
subscriptions::{graphql_subscriptions as sub_handler, EmptySubscriptionHandler},
10+
};
11+
use juniper_subscriptions::Coordinator;
12+
use std::{pin::Pin, time::Duration};
13+
14+
pub struct Query;
15+
16+
#[juniper::graphql_object(Context = Database)]
17+
impl Query {
18+
fn hello_world() -> &str {
19+
"Hello World!"
20+
}
21+
}
22+
type Schema = RootNode<'static, Query, EmptyMutation<Database>, Subscription>;
23+
type MyCoordinator = Coordinator<
24+
'static,
25+
Query,
26+
EmptyMutation<Database>,
27+
Subscription,
28+
Database,
29+
DefaultScalarValue,
30+
>;
31+
32+
type StringStream = Pin<Box<dyn Stream<Item = Result<String, FieldError>> + Send>>;
33+
34+
struct Subscription;
35+
36+
#[derive(Clone)]
37+
pub struct Database;
38+
39+
impl juniper::Context for Database {}
40+
41+
impl Database {
42+
fn new() -> Self {
43+
Self {}
44+
}
45+
}
46+
47+
#[juniper::graphql_subscription(Context = Database)]
48+
impl Subscription {
49+
async fn hello_world() -> StringStream {
50+
let mut counter = 0;
51+
let stream = tokio::time::interval(Duration::from_secs(5)).map(move |_| {
52+
counter += 1;
53+
if counter % 2 == 0 {
54+
Ok(String::from("World!"))
55+
} else {
56+
Ok(String::from("Hello"))
57+
}
58+
});
59+
60+
Box::pin(stream)
61+
}
62+
}
63+
64+
fn schema() -> Schema {
65+
Schema::new(Query {}, EmptyMutation::new(), Subscription {})
66+
}
67+
68+
async fn graphiql_handler() -> Result<HttpResponse, Error> {
69+
gqli_handler("/", Some("/subscriptions")).await
70+
}
71+
async fn playground_handler() -> Result<HttpResponse, Error> {
72+
play_handler("/", Some("/subscriptions")).await
73+
}
74+
75+
async fn graphql(
76+
req: actix_web::HttpRequest,
77+
payload: actix_web::web::Payload,
78+
schema: web::Data<Schema>,
79+
) -> Result<HttpResponse, Error> {
80+
let context = Database::new();
81+
graphql_handler(&schema, &context, req, payload).await
82+
}
83+
84+
async fn graphql_subscriptions(
85+
coordinator: web::Data<MyCoordinator>,
86+
stream: web::Payload,
87+
req: HttpRequest,
88+
) -> Result<HttpResponse, Error> {
89+
let context = Database::new();
90+
unsafe {
91+
sub_handler(
92+
coordinator,
93+
context,
94+
stream,
95+
req,
96+
Some(EmptySubscriptionHandler::default()),
97+
)
98+
}
99+
.await
100+
}
101+
102+
#[actix_rt::main]
103+
async fn main() -> std::io::Result<()> {
104+
::std::env::set_var("RUST_LOG", "actix_web=info");
105+
env_logger::init();
106+
let server = HttpServer::new(move || {
107+
App::new()
108+
.data(schema())
109+
.data(juniper_subscriptions::Coordinator::new(schema()))
110+
.wrap(middleware::Compress::default())
111+
.wrap(middleware::Logger::default())
112+
.wrap(
113+
Cors::new()
114+
.allowed_methods(vec!["POST", "GET"])
115+
.supports_credentials()
116+
.max_age(3600)
117+
.finish(),
118+
)
119+
.service(
120+
web::resource("/")
121+
.route(web::post().to(graphql))
122+
.route(web::get().to(graphql)),
123+
)
124+
.service(web::resource("/playground").route(web::get().to(playground_handler)))
125+
.service(web::resource("/graphiql").route(web::get().to(graphiql_handler)))
126+
.service(web::resource("/subscriptions").to(graphql_subscriptions))
127+
});
128+
server.bind("127.0.0.1:8080").unwrap().run().await
129+
}

0 commit comments

Comments
 (0)