Skip to content

feat(coro): update proxy client sample and improve async resolver throughput #317

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 6 additions & 53 deletions include/graphqlservice/GraphQLService.h
Original file line number Diff line number Diff line change
Expand Up @@ -342,40 +342,14 @@ class [[nodiscard("unnecessary construction")]] AwaitableScalar
std::promise<T> _promise;
};

[[nodiscard("unexpected call")]] bool await_ready() const noexcept
[[nodiscard("unexpected call")]] constexpr bool await_ready() const noexcept
{
return std::visit(
[](const auto& value) noexcept {
using value_type = std::decay_t<decltype(value)>;

if constexpr (std::is_same_v<value_type, T>)
{
return true;
}
else if constexpr (std::is_same_v<value_type, std::future<T>>)
{
using namespace std::literals;

return value.wait_for(0s) != std::future_status::timeout;
}
else if constexpr (std::is_same_v<value_type,
std::shared_ptr<const response::Value>>)
{
return true;
}
},
_value);
return true;
}

void await_suspend(std::coroutine_handle<> h) const
{
std::thread(
[this](std::coroutine_handle<> h) noexcept {
std::get<std::future<T>>(_value).wait();
h.resume();
},
std::move(h))
.detach();
h.resume();
}

[[nodiscard("unnecessary construction")]] T await_resume()
Expand Down Expand Up @@ -472,35 +446,14 @@ class [[nodiscard("unnecessary construction")]] AwaitableObject
std::promise<T> _promise;
};

[[nodiscard("unexpected call")]] bool await_ready() const noexcept
[[nodiscard("unexpected call")]] constexpr bool await_ready() const noexcept
{
return std::visit(
[](const auto& value) noexcept {
using value_type = std::decay_t<decltype(value)>;

if constexpr (std::is_same_v<value_type, T>)
{
return true;
}
else if constexpr (std::is_same_v<value_type, std::future<T>>)
{
using namespace std::literals;

return value.wait_for(0s) != std::future_status::timeout;
}
},
_value);
return true;
}

void await_suspend(std::coroutine_handle<> h) const
{
std::thread(
[this](std::coroutine_handle<> h) noexcept {
std::get<std::future<T>>(_value).wait();
h.resume();
},
std::move(h))
.detach();
h.resume();
}

[[nodiscard("unnecessary construction")]] T await_resume()
Expand Down
128 changes: 121 additions & 7 deletions samples/proxy/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "schema/ProxySchema.h"
#include "schema/QueryObject.h"
#include "schema/ResultsObject.h"

#include "graphqlservice/JSONResponse.h"

Expand All @@ -21,6 +22,7 @@
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/use_future.hpp>

#include <algorithm>
#include <cstdio>
#include <cstdlib>
#include <functional>
Expand All @@ -45,13 +47,111 @@ constexpr auto c_port = "8080"sv;
constexpr auto c_target = "/graphql"sv;
constexpr int c_version = 11; // HTTP 1.1

struct AsyncIoWorker : service::RequestState
{
AsyncIoWorker()
: worker { std::make_shared<service::await_worker_thread>() }
{
}

const service::await_async worker;
};

class Results
{
public:
explicit Results(response::Value&& data, std::vector<client::Error> errors) noexcept;

service::AwaitableScalar<std::optional<std::string>> getData(
service::FieldParams&& fieldParams) const;
service::AwaitableScalar<std::optional<std::vector<std::optional<std::string>>>> getErrors(
service::FieldParams&& fieldParams) const;

private:
mutable response::Value m_data;
mutable std::vector<client::Error> m_errors;
};

Results::Results(response::Value&& data, std::vector<client::Error> errors) noexcept
: m_data { std::move(data) }
, m_errors { std::move(errors) }
{
}

service::AwaitableScalar<std::optional<std::string>> Results::getData(
service::FieldParams&& fieldParams) const
{
auto asyncIoWorker = std::static_pointer_cast<AsyncIoWorker>(fieldParams.state);
auto data = std::move(m_data);

// Jump to a worker thread for the resolver where we can run a separate I/O context without
// blocking the I/O context in Query::getRelay. This simulates how you might fan out to
// additional async I/O tasks for sub-field resolvers.
co_await asyncIoWorker->worker;

net::io_context ioc;
auto future = net::co_spawn(
ioc,
[](response::Value&& data) -> net::awaitable<std::optional<std::string>> {
co_return (data.type() == response::Type::Null)
? std::nullopt
: std::make_optional(response::toJSON(std::move(data)));
}(std::move(data)),
net::use_future);

ioc.run();

co_return future.get();
}

service::AwaitableScalar<std::optional<std::vector<std::optional<std::string>>>> Results::getErrors(
service::FieldParams&& fieldParams) const
{
auto asyncIoWorker = std::static_pointer_cast<AsyncIoWorker>(fieldParams.state);
auto errors = std::move(m_errors);

// Jump to a worker thread for the resolver where we can run a separate I/O context without
// blocking the I/O context in Query::getRelay. This simulates how you might fan out to
// additional async I/O tasks for sub-field resolvers.
co_await asyncIoWorker->worker;

net::io_context ioc;
auto future = net::co_spawn(
ioc,
[](std::vector<client::Error> errors)
-> net::awaitable<std::optional<std::vector<std::optional<std::string>>>> {
if (errors.empty())
{
co_return std::nullopt;
}

std::vector<std::optional<std::string>> results { errors.size() };

std::transform(errors.begin(),
errors.end(),
results.begin(),
[](auto& error) noexcept -> std::optional<std::string> {
return error.message.empty()
? std::nullopt
: std::make_optional<std::string>(std::move(error.message));
});

co_return std::make_optional(results);
}(std::move(errors)),
net::use_future);

ioc.run();

co_return future.get();
}

class Query
{
public:
explicit Query(std::string_view host, std::string_view port, std::string_view target,
int version) noexcept;

std::future<std::optional<std::string>> getRelay(std::string&& queryArg,
std::future<std::shared_ptr<proxy::object::Results>> getRelay(std::string&& queryArg,
std::optional<std::string>&& operationNameArg,
std::optional<std::string>&& variablesArg) const;

Expand All @@ -73,7 +173,7 @@ Query::Query(

// Based on:
// https://www.boost.org/doc/libs/1_82_0/libs/beast/example/http/client/awaitable/http_client_awaitable.cpp
std::future<std::optional<std::string>> Query::getRelay(std::string&& queryArg,
std::future<std::shared_ptr<proxy::object::Results>> Query::getRelay(std::string&& queryArg,
std::optional<std::string>&& operationNameArg, std::optional<std::string>&& variablesArg) const
{
response::Value payload { response::Type::Map };
Expand All @@ -99,7 +199,7 @@ std::future<std::optional<std::string>> Query::getRelay(std::string&& queryArg,
const char* port,
const char* target,
int version,
std::string requestBody) -> net::awaitable<std::optional<std::string>> {
std::string requestBody) -> net::awaitable<std::shared_ptr<proxy::object::Results>> {
// These objects perform our I/O. They use an executor with a default completion token
// of use_awaitable. This makes our code easy, but will use exceptions as the default
// error handling, i.e. if the connection drops, we might see an exception.
Expand Down Expand Up @@ -150,7 +250,10 @@ std::future<std::optional<std::string>> Query::getRelay(std::string&& queryArg,
throw boost::system::system_error(ec, "shutdown");
}

co_return std::make_optional<std::string>(std::move(res.body()));
auto [data, errors] = client::parseServiceResponse(response::parseJSON(res.body()));

co_return std::make_shared<proxy::object::Results>(
std::make_shared<Results>(std::move(data), std::move(errors)));
}(m_host.c_str(), m_port.c_str(), m_target.c_str(), m_version, std::move(requestBody)),
net::use_future);

Expand Down Expand Up @@ -179,14 +282,25 @@ int main(int argc, char** argv)
auto variables = serializeVariables(
{ input, ((argc > 1) ? std::make_optional(argv[1]) : std::nullopt) });
auto launch = service::await_async { std::make_shared<service::await_worker_queue>() };
auto state = std::make_shared<AsyncIoWorker>();
auto serviceResponse = client::parseServiceResponse(
service->resolve({ query, GetOperationName(), std::move(variables), launch }).get());
service->resolve({ query, GetOperationName(), std::move(variables), launch, state })
.get());
auto result = client::query::relayQuery::parseResponse(std::move(serviceResponse.data));
auto errors = std::move(serviceResponse.errors);

if (result.relay)
if (result.relay.data)
{
std::cout << "Data: " << *result.relay.data << std::endl;
}

if (result.relay.errors)
{
std::cout << *result.relay << std::endl;
for (const auto& message : *result.relay.errors)
{
std::cerr << "Remote Error: "
<< (message ? std::string_view { *message } : "<empty>"sv) << std::endl;
}
}

if (!errors.empty())
Expand Down
34 changes: 32 additions & 2 deletions samples/proxy/query/ProxyClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ const std::string& GetRequestText() noexcept
# Licensed under the MIT License.

query relayQuery($query: String!, $operationName: String, $variables: String) {
relay(query: $query, operationName: $operationName, variables: $variables)
relay(query: $query, operationName: $operationName, variables: $variables) {
data
errors
}
}
)gql"s;

Expand All @@ -51,6 +54,33 @@ const peg::ast& GetRequestObject() noexcept

using namespace proxy;

template <>
query::relayQuery::Response::relay_Results Response<query::relayQuery::Response::relay_Results>::parse(response::Value&& response)
{
query::relayQuery::Response::relay_Results result;

if (response.type() == response::Type::Map)
{
auto members = response.release<response::MapType>();

for (auto& member : members)
{
if (member.first == R"js(data)js"sv)
{
result.data = ModifiedResponse<std::string>::parse<TypeModifier::Nullable>(std::move(member.second));
continue;
}
if (member.first == R"js(errors)js"sv)
{
result.errors = ModifiedResponse<std::string>::parse<TypeModifier::Nullable, TypeModifier::List, TypeModifier::Nullable>(std::move(member.second));
continue;
}
}
}

return result;
}

namespace query::relayQuery {

const std::string& GetOperationName() noexcept
Expand Down Expand Up @@ -83,7 +113,7 @@ Response parseResponse(response::Value&& response)
{
if (member.first == R"js(relay)js"sv)
{
result.relay = ModifiedResponse<std::string>::parse<TypeModifier::Nullable>(std::move(member.second));
result.relay = ModifiedResponse<query::relayQuery::Response::relay_Results>::parse(std::move(member.second));
continue;
}
}
Expand Down
13 changes: 11 additions & 2 deletions samples/proxy/query/ProxyClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ namespace graphql::client {
/// # Licensed under the MIT License.
///
/// query relayQuery($query: String!, $operationName: String, $variables: String) {
/// relay(query: $query, operationName: $operationName, variables: $variables)
/// relay(query: $query, operationName: $operationName, variables: $variables) {
/// data
/// errors
/// }
/// }
/// </code>
namespace proxy {
Expand Down Expand Up @@ -64,7 +67,13 @@ struct [[nodiscard("unnecessary construction")]] Variables

struct [[nodiscard("unnecessary construction")]] Response
{
std::optional<std::string> relay {};
struct [[nodiscard("unnecessary construction")]] relay_Results
{
std::optional<std::string> data {};
std::optional<std::vector<std::optional<std::string>>> errors {};
};

relay_Results relay {};
};

[[nodiscard("unnecessary conversion")]] Response parseResponse(response::Value&& response);
Expand Down
5 changes: 4 additions & 1 deletion samples/proxy/query/query.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,8 @@
# Licensed under the MIT License.

query relayQuery($query: String!, $operationName: String, $variables: String) {
relay(query: $query, operationName: $operationName, variables: $variables)
relay(query: $query, operationName: $operationName, variables: $variables) {
data
errors
}
}
3 changes: 3 additions & 0 deletions samples/proxy/schema/ProxySchema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ void AddTypesToSchema(const std::shared_ptr<schema::Schema>& schema)
{
auto typeQuery = schema::ObjectType::Make(R"gql(Query)gql"sv, R"md()md"sv);
schema->AddType(R"gql(Query)gql"sv, typeQuery);
auto typeResults = schema::ObjectType::Make(R"gql(Results)gql"sv, R"md()md"sv);
schema->AddType(R"gql(Results)gql"sv, typeResults);

AddQueryDetails(typeQuery, schema);
AddResultsDetails(typeResults, schema);

schema->AddQueryType(typeQuery);
}
Expand Down
2 changes: 2 additions & 0 deletions samples/proxy/schema/ProxySchema.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace proxy {
namespace object {

class Query;
class Results;

} // namespace object

Expand All @@ -50,6 +51,7 @@ class [[nodiscard("unnecessary construction")]] Operations final
};

void AddQueryDetails(const std::shared_ptr<schema::ObjectType>& typeQuery, const std::shared_ptr<schema::Schema>& schema);
void AddResultsDetails(const std::shared_ptr<schema::ObjectType>& typeResults, const std::shared_ptr<schema::Schema>& schema);

std::shared_ptr<schema::Schema> GetSchema();

Expand Down
Loading