Skip to content

feat: parallelize requests by default #102

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions helm/templates/serviceconfig.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ data:
graphql.urlPath = {{ .Values.serviceConfig.urlPath }}
graphql.corsEnabled = {{ .Values.serviceConfig.corsEnabled }}
graphql.timeout = {{ .Values.serviceConfig.timeoutDuration }}

threads.io.max = {{ .Values.serviceConfig.threads.io }}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No change here, just exposing existing pool for config with same default.

threads.request.max = {{ .Values.serviceConfig.threads.request }}

attribute.service = {
host = {{ .Values.serviceConfig.attributeService.host }}
Expand Down
3 changes: 3 additions & 0 deletions helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ serviceConfig:
corsEnabled: true
defaultTenantId: ""
timeoutDuration: 30s
threads:
io: 10
request: 10
attributeService:
host: attribute-service
port: 9012
Expand Down
3 changes: 3 additions & 0 deletions hypertrace-core-graphql-context/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ dependencies {
implementation(project(":hypertrace-core-graphql-spi"))
implementation("com.google.guava:guava")

annotationProcessor("org.projectlombok:lombok")
compileOnly("org.projectlombok:lombok")

testImplementation("org.junit.jupiter:junit-jupiter")
testImplementation("org.mockito:mockito-core")
testImplementation("org.mockito:mockito-junit-jupiter")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package org.hypertrace.core.graphql.context;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Injector;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.inject.Inject;
import javax.inject.Singleton;
import lombok.AllArgsConstructor;
import org.hypertrace.core.graphql.spi.config.GraphQlServiceConfig;

@Singleton
class AsyncDataFetcherFactory {

private final Injector injector;
private final GraphQlServiceConfig config;
private final ExecutorService requestExecutor;

@Inject
AsyncDataFetcherFactory(Injector injector, GraphQlServiceConfig config) {
this.injector = injector;
this.config = config;
this.requestExecutor =
Executors.newFixedThreadPool(
config.getMaxRequestThreads(),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("request-handler-%d").build());
}

<T> DataFetcher<CompletableFuture<T>> buildDataFetcher(
Class<? extends DataFetcher<CompletableFuture<T>>> dataFetcherClass) {
return new AsyncForwardingDataFetcher<>(
this.injector.getInstance(dataFetcherClass), requestExecutor, config);
}

@AllArgsConstructor
private static class AsyncForwardingDataFetcher<T> implements DataFetcher<CompletableFuture<T>> {
private final DataFetcher<CompletableFuture<T>> delegate;
private final ExecutorService executorService;
private final GraphQlServiceConfig config;

@Override
public CompletableFuture<T> get(DataFetchingEnvironment dataFetchingEnvironment)
throws Exception {
// Really all we're doing here is changing the thread that the future is run on by default
return CompletableFuture.supplyAsync(
() -> {
try {
return delegate
.get(dataFetchingEnvironment)
.get(config.getGraphQlTimeout().toMillis(), MILLISECONDS);
} catch (Exception e) {
throw new RuntimeException(e);
}
},
executorService);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.hypertrace.core.graphql.context;

import com.google.common.collect.Streams;
import com.google.inject.Injector;
import graphql.kickstart.servlet.context.DefaultGraphQLServletContext;
import graphql.kickstart.servlet.context.DefaultGraphQLServletContextBuilder;
import graphql.kickstart.servlet.context.GraphQLServletContext;
Expand All @@ -10,6 +9,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.inject.Inject;
Expand All @@ -27,12 +27,13 @@ class DefaultGraphQlRequestContextBuilder extends DefaultGraphQLServletContextBu
static final Set<String> TRACING_CONTEXT_HEADER_KEY_PREFIXES =
Set.of("X-B3-", "traceparent", "tracestate");

private final Injector injector;
private final GraphQlServiceConfig serviceConfig;
private final AsyncDataFetcherFactory dataFetcherFactory;

@Inject
DefaultGraphQlRequestContextBuilder(Injector injector, GraphQlServiceConfig serviceConfig) {
this.injector = injector;
DefaultGraphQlRequestContextBuilder(
AsyncDataFetcherFactory dataFetcherFactory, GraphQlServiceConfig serviceConfig) {
this.dataFetcherFactory = dataFetcherFactory;
this.serviceConfig = serviceConfig;
}

Expand Down Expand Up @@ -63,8 +64,10 @@ public Optional<DataLoaderRegistry> getDataLoaderRegistry() {
}

@Override
public <T extends DataFetcher<?>> T constructDataFetcher(Class<T> dataFetcherClass) {
return DefaultGraphQlRequestContextBuilder.this.injector.getInstance(dataFetcherClass);
public <T> DataFetcher<CompletableFuture<T>> constructDataFetcher(
Class<? extends DataFetcher<CompletableFuture<T>>> dataFetcherClass) {
return DefaultGraphQlRequestContextBuilder.this.dataFetcherFactory.buildDataFetcher(
dataFetcherClass);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import graphql.schema.DataFetcher;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;

public interface GraphQlRequestContext extends GraphQLContext {
Expand All @@ -12,7 +13,8 @@ public interface GraphQlRequestContext extends GraphQLContext {
* A tool to create data fetchers via injection container due to limitations in the framework. For
* normal injectable instantiation, do not use this method.
*/
<T extends DataFetcher<?>> T constructDataFetcher(Class<T> dataFetcherClass);
<T> DataFetcher<CompletableFuture<T>> constructDataFetcher(
Class<? extends DataFetcher<CompletableFuture<T>>> dataFetcherClass);

Optional<String> getAuthorizationHeader();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package org.hypertrace.core.graphql.context;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.when;

import com.google.inject.Guice;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.util.concurrent.CompletableFuture;
import org.hypertrace.core.graphql.spi.config.GraphQlServiceConfig;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
class AsyncDataFetcherFactoryTest {
@Mock GraphQlServiceConfig graphQlServiceConfig;
@Mock DataFetchingEnvironment dataFetchingEnvironment;

@Test
void canBuildAsyncDataFetcher() throws Exception {
when(graphQlServiceConfig.getMaxRequestThreads()).thenReturn(1);
DataFetcher<CompletableFuture<Thread>> fetcher =
new AsyncDataFetcherFactory(Guice.createInjector(), graphQlServiceConfig)
.buildDataFetcher(ThreadEchoingDataFetcher.class);

Thread fetcherThread = fetcher.get(dataFetchingEnvironment).get();

assertNotEquals(Thread.currentThread(), fetcherThread);
assertNotNull(fetcherThread);
}

private static class ThreadEchoingDataFetcher implements DataFetcher<CompletableFuture<Thread>> {
@Override
public CompletableFuture<Thread> get(DataFetchingEnvironment environment) {
return CompletableFuture.completedFuture(Thread.currentThread());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.inject.Injector;
import graphql.schema.DataFetcher;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.hypertrace.core.graphql.spi.config.GraphQlServiceConfig;
Expand All @@ -32,7 +32,7 @@
@ExtendWith(MockitoExtension.class)
class DefaultGraphQlRequestContextBuilderTest {

@Mock Injector mockInjector;
@Mock AsyncDataFetcherFactory mockDataFetcherFactory;
@Mock HttpServletRequest mockRequest;
@Mock HttpServletResponse mockResponse;
@Mock GraphQlServiceConfig mockServiceConfig;
Expand All @@ -43,7 +43,8 @@ class DefaultGraphQlRequestContextBuilderTest {
@BeforeEach
void beforeEach() {
this.contextBuilder =
new DefaultGraphQlRequestContextBuilder(this.mockInjector, this.mockServiceConfig);
new DefaultGraphQlRequestContextBuilder(
this.mockDataFetcherFactory, this.mockServiceConfig);
this.requestContext = this.contextBuilder.build(this.mockRequest, this.mockResponse);
}

Expand All @@ -70,9 +71,9 @@ void delegatesDataLoaderRegistry() {
}

@Test
void canConstructDataFetcher() {
this.requestContext.constructDataFetcher(DataFetcher.class);
verify(this.mockInjector).getInstance(DataFetcher.class);
void canDelegateDataFetcherConstruction() {
this.requestContext.constructDataFetcher(TestDataFetcher.class);
verify(this.mockDataFetcherFactory).buildDataFetcher(TestDataFetcher.class);
}

@Test
Expand Down Expand Up @@ -135,4 +136,6 @@ void returnsLowerCasedTracingHeadersIfAnyMatches() {
"x-b3-parent-trace-id value"),
this.requestContext.getTracingContextHeaders());
}

private interface TestDataFetcher extends DataFetcher<CompletableFuture<String>> {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class DefaultGraphQlServiceConfig implements GraphQlServiceConfig {
private static final String DEFAULT_TENANT_ID = "defaultTenantId";

private static final String MAX_IO_THREADS_PROPERTY = "threads.io.max";
private static final String MAX_REQUEST_THREADS_PROPERTY = "threads.request.max";

private static final String ATTRIBUTE_SERVICE_HOST_PROPERTY = "attribute.service.host";
private static final String ATTRIBUTE_SERVICE_PORT_PROPERTY = "attribute.service.port";
Expand All @@ -36,6 +37,7 @@ class DefaultGraphQlServiceConfig implements GraphQlServiceConfig {
private final Duration graphQlTimeout;
private final Optional<String> defaultTenantId;
private final int maxIoThreads;
private final int maxRequestThreads;
private final String attributeServiceHost;
private final int attributeServicePort;
private final Duration attributeServiceTimeout;
Expand All @@ -51,6 +53,7 @@ class DefaultGraphQlServiceConfig implements GraphQlServiceConfig {
this.graphQlTimeout = untypedConfig.getDuration(GRAPHQL_TIMEOUT);
this.defaultTenantId = optionallyGet(() -> untypedConfig.getString(DEFAULT_TENANT_ID));
this.maxIoThreads = untypedConfig.getInt(MAX_IO_THREADS_PROPERTY);
this.maxRequestThreads = untypedConfig.getInt(MAX_REQUEST_THREADS_PROPERTY);

this.attributeServiceHost = untypedConfig.getString(ATTRIBUTE_SERVICE_HOST_PROPERTY);
this.attributeServicePort = untypedConfig.getInt(ATTRIBUTE_SERVICE_PORT_PROPERTY);
Expand Down Expand Up @@ -98,6 +101,11 @@ public int getMaxIoThreads() {
return maxIoThreads;
}

@Override
public int getMaxRequestThreads() {
return maxRequestThreads;
}

@Override
public String getAttributeServiceHost() {
return this.attributeServiceHost;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ graphql.corsEnabled = true
graphql.timeout = 30s

threads.io.max = 10
threads.request.max = 10

attribute.service = {
host = localhost
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ public interface GraphQlServiceConfig {

Optional<String> getDefaultTenantId();

int getMaxRequestThreads();

int getMaxIoThreads();

String getAttributeServiceHost();
Expand Down