Skip to content

Simplify SubscriptionConnection #719

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions juniper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ pub use crate::{
marker::{self, GraphQLUnion},
scalars::{EmptyMutation, EmptySubscription, ID},
subscriptions::{
GraphQLSubscriptionType, GraphQLSubscriptionValue, SubscriptionConnection,
SubscriptionCoordinator,
ExecutionOutput, GraphQLSubscriptionType, GraphQLSubscriptionValue,
SubscriptionConnection, SubscriptionCoordinator,
},
},
validation::RuleError,
Expand Down
34 changes: 29 additions & 5 deletions juniper/src/types/subscriptions.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,37 @@
use futures::{future, stream};
use serde::Serialize;

use crate::{
http::{GraphQLRequest, GraphQLResponse},
http::GraphQLRequest,
parser::Spanning,
types::base::{is_excluded, merge_key_into, GraphQLType, GraphQLValue},
Arguments, BoxFuture, DefaultScalarValue, Executor, FieldError, Object, ScalarValue, Selection,
Value, ValuesStream,
Arguments, BoxFuture, DefaultScalarValue, ExecutionError, Executor, FieldError, Object,
ScalarValue, Selection, Value, ValuesStream,
};

/// Represents the result of executing a GraphQL operation (after parsing and validating has been
/// done).
#[derive(Debug, Serialize)]
pub struct ExecutionOutput<S> {
/// The output data.
pub data: Value<S>,

/// The errors that occurred. Note that the presence of errors does not mean there is no data.
/// The output can have both data and errors.
#[serde(bound(serialize = "S: ScalarValue"))]
pub errors: Vec<ExecutionError<S>>,
}

impl<S> ExecutionOutput<S> {
/// Creates execution output from data, with no errors.
pub fn from_data(data: Value<S>) -> Self {
Self {
data,
errors: vec![],
}
}
}

/// Global subscription coordinator trait.
///
/// With regular queries we could get away with not having some in-between
Expand All @@ -33,7 +57,7 @@ where
{
/// Type of [`SubscriptionConnection`]s this [`SubscriptionCoordinator`]
/// returns
type Connection: SubscriptionConnection<'a, S>;
type Connection: SubscriptionConnection<S>;

/// Type of error while trying to spawn [`SubscriptionConnection`]
type Error;
Expand All @@ -58,7 +82,7 @@ where
///
/// It can be treated as [`futures::Stream`] yielding [`GraphQLResponse`]s in
/// server integration crates.
pub trait SubscriptionConnection<'a, S>: futures::Stream<Item = GraphQLResponse<'a, S>> {}
pub trait SubscriptionConnection<S>: futures::Stream<Item = ExecutionOutput<S>> {}

/// Extension of [`GraphQLValue`] trait with asynchronous [subscription][1] execution logic.
/// It should be used with [`GraphQLValue`] in order to implement [subscription][1] resolvers on
Expand Down
143 changes: 57 additions & 86 deletions juniper_subscriptions/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use std::{

use futures::{future, stream, FutureExt as _, Stream, StreamExt as _, TryFutureExt as _};
use juniper::{
http::{GraphQLRequest, GraphQLResponse},
BoxFuture, ExecutionError, GraphQLError, GraphQLSubscriptionType, GraphQLTypeAsync, Object,
ScalarValue, SubscriptionConnection, SubscriptionCoordinator, Value, ValuesStream,
http::GraphQLRequest, BoxFuture, ExecutionError, ExecutionOutput, GraphQLError,
GraphQLSubscriptionType, GraphQLTypeAsync, Object, ScalarValue, SubscriptionConnection,
SubscriptionCoordinator, Value, ValuesStream,
};

/// Simple [`SubscriptionCoordinator`] implementation:
Expand Down Expand Up @@ -88,8 +88,8 @@ where

/// Simple [`SubscriptionConnection`] implementation.
///
/// Resolves `Value<ValuesStream>` into `Stream<Item = GraphQLResponse>` using the following
/// logic:
/// Resolves `Value<ValuesStream>` into `Stream<Item = ExecutionOutput<S>>` using
/// the following logic:
///
/// [`Value::Null`] - returns [`Value::Null`] once
/// [`Value::Scalar`] - returns `Ok` value or [`Value::Null`] and errors vector
Expand All @@ -98,7 +98,7 @@ where
/// [`Value::Object`] - waits while each field of the [`Object`] is returned, then yields the whole object
/// `Value::Object<Value::Object<_>>` - returns [`Value::Null`] if [`Value::Object`] consists of sub-objects
pub struct Connection<'a, S> {
stream: Pin<Box<dyn Stream<Item = GraphQLResponse<'a, S>> + Send + 'a>>,
stream: Pin<Box<dyn Stream<Item = ExecutionOutput<S>> + Send + 'a>>,
}

impl<'a, S> Connection<'a, S>
Expand All @@ -113,16 +113,13 @@ where
}
}

impl<'a, S> SubscriptionConnection<'a, S> for Connection<'a, S> where
S: ScalarValue + Send + Sync + 'a
{
}
impl<'a, S> SubscriptionConnection<S> for Connection<'a, S> where S: ScalarValue + Send + Sync + 'a {}

impl<'a, S> Stream for Connection<'a, S>
where
S: ScalarValue + Send + Sync + 'a,
{
type Item = GraphQLResponse<'a, S>;
type Item = ExecutionOutput<S>;

fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
// this is safe as stream is only mutated here and is not moved anywhere
Expand All @@ -132,7 +129,7 @@ where
}
}

/// Creates [`futures::Stream`] that yields [`GraphQLResponse`]s depending on the given [`Value`]:
/// Creates [`futures::Stream`] that yields `ExecutionOutput<S>`s depending on the given [`Value`]:
///
/// [`Value::Null`] - returns [`Value::Null`] once
/// [`Value::Scalar`] - returns `Ok` value or [`Value::Null`] and errors vector
Expand All @@ -143,23 +140,28 @@ where
fn whole_responses_stream<'a, S>(
stream: Value<ValuesStream<'a, S>>,
errors: Vec<ExecutionError<S>>,
) -> Pin<Box<dyn Stream<Item = GraphQLResponse<'a, S>> + Send + 'a>>
) -> Pin<Box<dyn Stream<Item = ExecutionOutput<S>> + Send + 'a>>
where
S: ScalarValue + Send + Sync + 'a,
{
if !errors.is_empty() {
return Box::pin(stream::once(future::ready(GraphQLResponse::from_result(
Ok((Value::Null, errors)),
))));
return stream::once(future::ready(ExecutionOutput {
data: Value::null(),
errors,
}))
.boxed();
}

match stream {
Value::Null => Box::pin(stream::once(future::ready(GraphQLResponse::from_result(
Ok((Value::Null, vec![])),
Value::Null => Box::pin(stream::once(future::ready(ExecutionOutput::from_data(
Value::null(),
)))),
Value::Scalar(s) => Box::pin(s.map(|res| match res {
Ok(val) => GraphQLResponse::from_result(Ok((val, vec![]))),
Err(err) => GraphQLResponse::from_result(Ok((Value::Null, vec![err]))),
Ok(val) => ExecutionOutput::from_data(val),
Err(err) => ExecutionOutput {
data: Value::null(),
errors: vec![err],
},
})),
Value::List(list) => {
let mut streams = vec![];
Expand All @@ -171,9 +173,8 @@ where
Value::Object(mut object) => {
let obj_len = object.field_count();
if obj_len == 0 {
return Box::pin(stream::once(future::ready(GraphQLResponse::from_result(
Ok((Value::Null, vec![])),
))));
return stream::once(future::ready(ExecutionOutput::from_data(Value::null())))
.boxed();
}

let mut filled_count = 0;
Expand All @@ -182,7 +183,7 @@ where
ready_vec.push(None);
}

let stream = stream::poll_fn(move |mut ctx| -> Poll<Option<GraphQLResponse<'a, S>>> {
let stream = stream::poll_fn(move |mut ctx| -> Poll<Option<ExecutionOutput<S>>> {
let mut obj_iterator = object.iter_mut();

// Due to having to modify `ready_vec` contents (by-move pattern)
Expand Down Expand Up @@ -233,10 +234,7 @@ where
}
});
let obj = Object::from_iter(ready_vec_iterator);
Poll::Ready(Some(GraphQLResponse::from_result(Ok((
Value::Object(obj),
vec![],
)))))
Poll::Ready(Some(ExecutionOutput::from_data(Value::Object(obj))))
} else {
Poll::Pending
}
Expand All @@ -256,9 +254,13 @@ mod whole_responses_stream {

#[tokio::test]
async fn with_error() {
let expected = vec![GraphQLResponse::<DefaultScalarValue>::error(
FieldError::new("field error", Value::Null),
)];
let expected = vec![ExecutionOutput {
data: Value::<DefaultScalarValue>::Null,
errors: vec![ExecutionError::at_origin(FieldError::new(
"field error",
Value::Null,
))],
}];
let expected = serde_json::to_string(&expected).unwrap();

let result = whole_responses_stream::<DefaultScalarValue>(
Expand All @@ -277,10 +279,9 @@ mod whole_responses_stream {

#[tokio::test]
async fn value_null() {
let expected = vec![GraphQLResponse::<DefaultScalarValue>::from_result(Ok((
Value::Null,
vec![],
)))];
let expected = vec![ExecutionOutput::from_data(
Value::<DefaultScalarValue>::Null,
)];
let expected = serde_json::to_string(&expected).unwrap();

let result = whole_responses_stream::<DefaultScalarValue>(Value::Null, vec![])
Expand All @@ -296,26 +297,11 @@ mod whole_responses_stream {
#[tokio::test]
async fn value_scalar() {
let expected = vec![
GraphQLResponse::from_result(Ok((
Value::Scalar(DefaultScalarValue::Int(1i32)),
vec![],
))),
GraphQLResponse::from_result(Ok((
Value::Scalar(DefaultScalarValue::Int(2i32)),
vec![],
))),
GraphQLResponse::from_result(Ok((
Value::Scalar(DefaultScalarValue::Int(3i32)),
vec![],
))),
GraphQLResponse::from_result(Ok((
Value::Scalar(DefaultScalarValue::Int(4i32)),
vec![],
))),
GraphQLResponse::from_result(Ok((
Value::Scalar(DefaultScalarValue::Int(5i32)),
vec![],
))),
ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(1i32))),
ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(2i32))),
ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(3i32))),
ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(4i32))),
ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(5i32))),
];
let expected = serde_json::to_string(&expected).unwrap();

Expand All @@ -340,19 +326,10 @@ mod whole_responses_stream {
#[tokio::test]
async fn value_list() {
let expected = vec![
GraphQLResponse::from_result(Ok((
Value::Scalar(DefaultScalarValue::Int(1i32)),
vec![],
))),
GraphQLResponse::from_result(Ok((
Value::Scalar(DefaultScalarValue::Int(2i32)),
vec![],
))),
GraphQLResponse::from_result(Ok((Value::Null, vec![]))),
GraphQLResponse::from_result(Ok((
Value::Scalar(DefaultScalarValue::Int(4i32)),
vec![],
))),
ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(1i32))),
ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(2i32))),
ExecutionOutput::from_data(Value::null()),
ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(4i32))),
];
let expected = serde_json::to_string(&expected).unwrap();

Expand Down Expand Up @@ -380,25 +357,19 @@ mod whole_responses_stream {
#[tokio::test]
async fn value_object() {
let expected = vec![
GraphQLResponse::from_result(Ok((
Value::Object(Object::from_iter(
vec![
("one", Value::Scalar(DefaultScalarValue::Int(1i32))),
("two", Value::Scalar(DefaultScalarValue::Int(1i32))),
]
.into_iter(),
)),
vec![],
ExecutionOutput::from_data(Value::Object(Object::from_iter(
vec![
("one", Value::Scalar(DefaultScalarValue::Int(1i32))),
("two", Value::Scalar(DefaultScalarValue::Int(1i32))),
]
.into_iter(),
))),
GraphQLResponse::from_result(Ok((
Value::Object(Object::from_iter(
vec![
("one", Value::Scalar(DefaultScalarValue::Int(2i32))),
("two", Value::Scalar(DefaultScalarValue::Int(2i32))),
]
.into_iter(),
)),
vec![],
ExecutionOutput::from_data(Value::Object(Object::from_iter(
vec![
("one", Value::Scalar(DefaultScalarValue::Int(2i32))),
("two", Value::Scalar(DefaultScalarValue::Int(2i32))),
]
.into_iter(),
))),
];
let expected = serde_json::to_string(&expected).unwrap();
Expand Down