@@ -413,124 +413,89 @@ where
413
413
#[ cfg( test) ]
414
414
mod tests {
415
415
use super :: * ;
416
+ use actix_web:: HttpRequest ;
416
417
use actix_web:: { test, App } ;
418
+ use actix_web_actors:: ws:: { Frame , Message } ;
417
419
use futures:: StreamExt ;
420
+ use futures:: { SinkExt , Stream } ;
421
+ use juniper:: {
422
+ tests:: model:: Database , tests:: schema:: Query , DefaultScalarValue , EmptyMutation ,
423
+ FieldError , RootNode ,
424
+ } ;
425
+ use juniper_subscriptions:: Coordinator ;
426
+ use std:: { pin:: Pin , time:: Duration } ;
427
+ type Schema = RootNode < ' static , Query , EmptyMutation < Database > , Subscription > ;
428
+ type StringStream = Pin < Box < dyn Stream < Item = Result < String , FieldError > > + Send > > ;
429
+ type MyCoordinator = Coordinator <
430
+ ' static ,
431
+ Query ,
432
+ EmptyMutation < Database > ,
433
+ Subscription ,
434
+ Database ,
435
+ DefaultScalarValue ,
436
+ > ;
437
+ struct Subscription ;
418
438
419
- #[ actix_rt:: test]
420
- async fn expected_communication ( ) {
421
- use actix_web:: HttpRequest ;
422
- use actix_web_actors:: ws:: { Frame , Message } ;
423
- use futures:: { SinkExt , Stream } ;
424
- use juniper:: { DefaultScalarValue , EmptyMutation , FieldError , RootNode } ;
425
- use juniper_subscriptions:: Coordinator ;
426
- use std:: { pin:: Pin , time:: Duration } ;
427
-
428
- pub struct Query ;
429
-
430
- #[ juniper:: graphql_object( Context = Database ) ]
431
- impl Query {
432
- fn hello_world ( ) -> & str {
433
- "Hello World!"
434
- }
435
- }
436
- type Schema = RootNode < ' static , Query , EmptyMutation < Database > , Subscription > ;
437
- type StringStream = Pin < Box < dyn Stream < Item = Result < String , FieldError > > + Send > > ;
438
- type MyCoordinator = Coordinator <
439
- ' static ,
440
- Query ,
441
- EmptyMutation < Database > ,
442
- Subscription ,
443
- Database ,
444
- DefaultScalarValue ,
445
- > ;
446
- struct Subscription ;
447
-
448
- #[ derive( Clone ) ]
449
- pub struct Database ;
450
-
451
- impl juniper:: Context for Database { }
452
-
453
- impl Database {
454
- fn new ( ) -> Self {
455
- Self { }
456
- }
439
+ #[ juniper:: graphql_subscription( Context = Database ) ]
440
+ impl Subscription {
441
+ async fn hello_world ( ) -> StringStream {
442
+ let mut counter = 0 ;
443
+ let stream = tokio:: time:: interval ( Duration :: from_secs ( 2 ) ) . map ( move |_| {
444
+ counter += 1 ;
445
+ if counter % 2 == 0 {
446
+ Ok ( String :: from ( "World!" ) )
447
+ } else {
448
+ Ok ( String :: from ( "Hello" ) )
449
+ }
450
+ } ) ;
451
+ Box :: pin ( stream)
457
452
}
453
+ }
458
454
459
- #[ juniper:: graphql_subscription( Context = Database ) ]
460
- impl Subscription {
461
- async fn hello_world ( ) -> StringStream {
462
- let mut counter = 0 ;
463
- let stream = tokio:: time:: interval ( Duration :: from_secs ( 2 ) ) . map ( move |_| {
464
- counter += 1 ;
465
- if counter % 2 == 0 {
466
- Ok ( String :: from ( "World!" ) )
467
- } else {
468
- Ok ( String :: from ( "Hello" ) )
469
- }
470
- } ) ;
471
- Box :: pin ( stream)
472
- }
473
- }
455
+ async fn gql_subscriptions (
456
+ coordinator : web:: Data < MyCoordinator > ,
457
+ stream : web:: Payload ,
458
+ req : HttpRequest ,
459
+ ) -> Result < HttpResponse , Error > {
460
+ let context = Database :: new ( ) ;
461
+ graphql_subscriptions (
462
+ coordinator,
463
+ context,
464
+ stream,
465
+ req,
466
+ Some ( EmptySubscriptionHandler :: default ( ) ) ,
467
+ None ,
468
+ )
469
+ . await
470
+ }
474
471
472
+ fn test_server ( ) -> test:: TestServer {
475
473
let schema: Schema =
476
474
RootNode :: new ( Query , EmptyMutation :: < Database > :: new ( ) , Subscription { } ) ;
477
475
478
- async fn gql_subscriptions (
479
- coordinator : web:: Data < MyCoordinator > ,
480
- stream : web:: Payload ,
481
- req : HttpRequest ,
482
- ) -> Result < HttpResponse , Error > {
483
- let context = Database :: new ( ) ;
484
- graphql_subscriptions (
485
- coordinator,
486
- context,
487
- stream,
488
- req,
489
- Some ( EmptySubscriptionHandler :: default ( ) ) ,
490
- None ,
491
- )
492
- . await
493
- }
494
476
let coord = web:: Data :: new ( juniper_subscriptions:: Coordinator :: new ( schema) ) ;
495
- let mut app = test:: start ( move || {
477
+ test:: start ( move || {
496
478
App :: new ( )
497
479
. app_data ( coord. clone ( ) )
498
480
. service ( web:: resource ( "/subscriptions" ) . to ( gql_subscriptions) )
499
- } ) ;
500
- let mut ws = app. ws_at ( "/subscriptions" ) . await . unwrap ( ) ;
501
- let messages_to_be_sent = vec ! [
502
- String :: from( r#"{"type":"connection_init","payload":{}}"# ) ,
503
- String :: from(
504
- r#"{"id":"1","type":"start","payload":{"variables":{},"extensions":{},"operationName":"hello","query":"subscription hello { helloWorld}"}}"# ,
505
- ) ,
506
- String :: from(
507
- r#"{"id":"2","type":"start","payload":{"variables":{},"extensions":{},"operationName":"hello","query":"subscription hello { helloWorld}"}}"# ,
508
- ) ,
509
- String :: from( r#"{"id":"1","type":"stop"}"# ) ,
510
- String :: from( r#"{"type":"connection_terminate"}"# ) ,
511
- ] ;
512
- let messages_to_be_received = vec ! [
513
- vec![
514
- Some ( bytes:: Bytes :: from(
515
- r#"{"type":"connection_ack", "payload": null }"# ,
516
- ) ) ,
517
- Some ( bytes:: Bytes :: from( r#"{"type":"ka", "payload": null }"# ) ) ,
518
- ] ,
519
- vec![ Some ( bytes:: Bytes :: from(
520
- r#"{"type":"data","id":"1","payload":{"data":{"helloWorld":"Hello"}} }"# ,
521
- ) ) ] ,
522
- vec![ Some ( bytes:: Bytes :: from(
523
- r#"{"type":"data","id":"2","payload":{"data":{"helloWorld":"Hello"}} }"# ,
524
- ) ) ] ,
525
- vec![ Some ( bytes:: Bytes :: from(
526
- r#"{"type":"complete","id":"1","payload":null}"# ,
527
- ) ) ] ,
528
- vec![ None ] ,
529
- ] ;
481
+ } )
482
+ }
483
+
484
+ fn received_msg ( msg : & ' static str ) -> Option < bytes:: Bytes > {
485
+ Some ( bytes:: Bytes :: from ( msg) )
486
+ }
530
487
531
- for ( index, msg_to_be_sent) in messages_to_be_sent. into_iter ( ) . enumerate ( ) {
532
- let expected_msgs = messages_to_be_received. get ( index) . unwrap ( ) ;
533
- ws. send ( Message :: Text ( msg_to_be_sent) ) . await . unwrap ( ) ;
488
+ async fn test_subscription (
489
+ msgs_to_send : Vec < & str > ,
490
+ msgs_to_receive : Vec < Vec < Option < bytes:: Bytes > > > ,
491
+ ) {
492
+ let mut app = test_server ( ) ;
493
+ let mut ws = app. ws_at ( "/subscriptions" ) . await . unwrap ( ) ;
494
+ for ( index, msg_to_be_sent) in msgs_to_send. into_iter ( ) . enumerate ( ) {
495
+ let expected_msgs = msgs_to_receive. get ( index) . unwrap ( ) ;
496
+ ws. send ( Message :: Text ( msg_to_be_sent. to_string ( ) ) )
497
+ . await
498
+ . unwrap ( ) ;
534
499
for expected_msg in expected_msgs {
535
500
let ( item, ws_stream) = ws. into_future ( ) . await ;
536
501
ws = ws_stream;
@@ -547,4 +512,68 @@ mod tests {
547
512
}
548
513
}
549
514
}
515
+
516
+ #[ actix_rt:: test]
517
+ async fn basic_connection ( ) {
518
+ let msgs_to_send = vec ! [
519
+ r#"{"type":"connection_init","payload":{}}"# ,
520
+ r#"{"type":"connection_terminate"}"# ,
521
+ ] ;
522
+ let msgs_to_receive = vec ! [
523
+ vec![
524
+ received_msg( r#"{"type":"connection_ack", "payload": null }"# ) ,
525
+ received_msg( r#"{"type":"ka", "payload": null }"# ) ,
526
+ ] ,
527
+ vec![ None ] ,
528
+ ] ;
529
+ test_subscription ( msgs_to_send, msgs_to_receive) . await ;
530
+ }
531
+
532
+ #[ actix_rt:: test]
533
+ async fn basic_subscription ( ) {
534
+ let msgs_to_send = vec ! [
535
+ r#"{"type":"connection_init","payload":{}}"# ,
536
+ r#"{"id":"1","type":"start","payload":{"variables":{},"extensions":{},"operationName":"hello","query":"subscription hello { helloWorld}"}}"# ,
537
+ r#"{"type":"connection_terminate"}"# ,
538
+ ] ;
539
+ let msgs_to_receive = vec ! [
540
+ vec![
541
+ received_msg( r#"{"type":"connection_ack", "payload": null }"# ) ,
542
+ received_msg( r#"{"type":"ka", "payload": null }"# ) ,
543
+ ] ,
544
+ vec![ received_msg(
545
+ r#"{"type":"data","id":"1","payload":{"data":{"helloWorld":"Hello"}} }"# ,
546
+ ) ] ,
547
+ vec![ None ] ,
548
+ ] ;
549
+ test_subscription ( msgs_to_send, msgs_to_receive) . await ;
550
+ }
551
+
552
+ #[ actix_rt:: test]
553
+ async fn conn_with_two_subscriptions ( ) {
554
+ let msgs_to_send = vec ! [
555
+ r#"{"type":"connection_init","payload":{}}"# ,
556
+ r#"{"id":"1","type":"start","payload":{"variables":{},"extensions":{},"operationName":"hello","query":"subscription hello { helloWorld}"}}"# ,
557
+ r#"{"id":"2","type":"start","payload":{"variables":{},"extensions":{},"operationName":"hello","query":"subscription hello { helloWorld}"}}"# ,
558
+ r#"{"id":"1","type":"stop"}"# ,
559
+ r#"{"type":"connection_terminate"}"# ,
560
+ ] ;
561
+ let msgs_to_receive = vec ! [
562
+ vec![
563
+ received_msg( r#"{"type":"connection_ack", "payload": null }"# ) ,
564
+ received_msg( r#"{"type":"ka", "payload": null }"# ) ,
565
+ ] ,
566
+ vec![ received_msg(
567
+ r#"{"type":"data","id":"1","payload":{"data":{"helloWorld":"Hello"}} }"# ,
568
+ ) ] ,
569
+ vec![ received_msg(
570
+ r#"{"type":"data","id":"2","payload":{"data":{"helloWorld":"Hello"}} }"# ,
571
+ ) ] ,
572
+ vec![ received_msg(
573
+ r#"{"type":"complete","id":"1","payload":null}"# ,
574
+ ) ] ,
575
+ vec![ None ] ,
576
+ ] ;
577
+ test_subscription ( msgs_to_send, msgs_to_receive) . await ;
578
+ }
550
579
}
0 commit comments