Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.elasticsearch.http.HttpReadTimeoutException;
import org.elasticsearch.http.HttpServerChannel;
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
import org.elasticsearch.rest.RestRequestFactory;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.SharedGroupFactory;
import org.elasticsearch.transport.NettyAllocator;
Expand Down Expand Up @@ -147,8 +148,8 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {

public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
NamedXContentRegistry xContentRegistry, Dispatcher dispatcher, ClusterSettings clusterSettings,
SharedGroupFactory sharedGroupFactory) {
super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, clusterSettings);
SharedGroupFactory sharedGroupFactory, RestRequestFactory restRequestFactory) {
super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, clusterSettings, restRequestFactory);
Netty4Utils.setAvailableProcessors(EsExecutors.NODE_PROCESSORS_SETTING.get(settings));
this.sharedGroupFactory = sharedGroupFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestRequestFactory;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.netty4.Netty4Transport;

Expand Down Expand Up @@ -90,10 +91,11 @@ public Map<String, Supplier<HttpServerTransport>> getHttpTransports(Settings set
NamedXContentRegistry xContentRegistry,
NetworkService networkService,
HttpServerTransport.Dispatcher dispatcher,
ClusterSettings clusterSettings) {
ClusterSettings clusterSettings,
RestRequestFactory restRequestFactory) {
return Collections.singletonMap(NETTY_HTTP_TRANSPORT_NAME,
() -> new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher,
clusterSettings, getSharedGroupFactory(settings)));
clusterSettings, getSharedGroupFactory(settings), restRequestFactory));
}

private SharedGroupFactory getSharedGroupFactory(Settings settings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext,
Settings settings = Settings.builder().put(HttpTransportSettings.SETTING_HTTP_PORT.getKey(), getPortRange()).build();
try (HttpServerTransport httpServerTransport = new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool,
xContentRegistry(), dispatcher, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
new SharedGroupFactory(Settings.EMPTY))) {
new SharedGroupFactory(Settings.EMPTY), RestRequest::request)) {
httpServerTransport.start();
final TransportAddress transportAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.NullDispatcher;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
Expand Down Expand Up @@ -120,7 +121,7 @@ class CustomNettyHttpServerTransport extends Netty4HttpServerTransport {
Netty4HttpServerPipeliningTests.this.bigArrays,
Netty4HttpServerPipeliningTests.this.threadPool,
xContentRegistry(), new NullDispatcher(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
new SharedGroupFactory(settings));
new SharedGroupFactory(settings), RestRequest::request);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext,
}
};
try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool,
xContentRegistry(), dispatcher, clusterSettings, new SharedGroupFactory(settings))) {
xContentRegistry(), dispatcher, clusterSettings, new SharedGroupFactory(settings), RestRequest::request)) {
transport.start();
final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
try (Netty4HttpClient client = new Netty4HttpClient()) {
Expand Down Expand Up @@ -204,15 +204,15 @@ public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext,
public void testBindUnavailableAddress() {
Settings initialSettings = createSettings();
try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(initialSettings, networkService, bigArrays, threadPool,
xContentRegistry(), new NullDispatcher(), clusterSettings, new SharedGroupFactory(Settings.EMPTY))) {
xContentRegistry(), new NullDispatcher(), clusterSettings, new SharedGroupFactory(Settings.EMPTY), RestRequest::request)) {
transport.start();
TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
Settings settings = Settings.builder()
.put("http.port", remoteAddress.getPort())
.put("network.host", remoteAddress.getAddress())
.build();
try (Netty4HttpServerTransport otherTransport = new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool,
xContentRegistry(), new NullDispatcher(), clusterSettings, new SharedGroupFactory(settings))) {
xContentRegistry(), new NullDispatcher(), clusterSettings, new SharedGroupFactory(settings), RestRequest::request)) {
BindHttpException bindHttpException = expectThrows(BindHttpException.class, otherTransport::start);
assertEquals(
"Failed to bind to " + NetworkAddress.format(remoteAddress.address()),
Expand Down Expand Up @@ -258,7 +258,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th

try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(
settings, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher, clusterSettings,
new SharedGroupFactory(settings))) {
new SharedGroupFactory(settings), RestRequest::request)) {
transport.start();
final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());

Expand Down Expand Up @@ -308,7 +308,7 @@ public void dispatchBadRequest(final RestChannel channel,

try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool,
xContentRegistry(), dispatcher, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
new SharedGroupFactory(settings))) {
new SharedGroupFactory(settings), RestRequest::request)) {
transport.start();
final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());

Expand Down Expand Up @@ -371,7 +371,7 @@ public void dispatchBadRequest(final RestChannel channel,
NioEventLoopGroup group = new NioEventLoopGroup();
try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool,
xContentRegistry(), dispatcher, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
new SharedGroupFactory(settings))) {
new SharedGroupFactory(settings), RestRequest::request)) {
transport.start();
final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.nio.ServerChannelContext;
import org.elasticsearch.nio.SocketChannelContext;
import org.elasticsearch.rest.RestRequestFactory;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.nio.NioGroupFactory;
import org.elasticsearch.transport.nio.PageAllocator;
Expand Down Expand Up @@ -86,8 +87,9 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {

public NioHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler, ThreadPool threadPool, NamedXContentRegistry xContentRegistry,
Dispatcher dispatcher, NioGroupFactory nioGroupFactory, ClusterSettings clusterSettings) {
super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, clusterSettings);
Dispatcher dispatcher, NioGroupFactory nioGroupFactory, ClusterSettings clusterSettings,
RestRequestFactory restRequestFactory) {
super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, clusterSettings, restRequestFactory);
this.pageAllocator = new PageAllocator(pageCacheRecycler);
this.nioGroupFactory = nioGroupFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestRequestFactory;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;

Expand Down Expand Up @@ -88,10 +89,11 @@ public Map<String, Supplier<HttpServerTransport>> getHttpTransports(Settings set
NamedXContentRegistry xContentRegistry,
NetworkService networkService,
HttpServerTransport.Dispatcher dispatcher,
ClusterSettings clusterSettings) {
ClusterSettings clusterSettings,
RestRequestFactory restRequestFactory) {
return Collections.singletonMap(NIO_HTTP_TRANSPORT_NAME,
() -> new NioHttpServerTransport(settings, networkService, bigArrays, pageCacheRecycler, threadPool, xContentRegistry,
dispatcher, getNioGroupFactory(settings), clusterSettings));
dispatcher, getNioGroupFactory(settings), clusterSettings, restRequestFactory));
}

private synchronized NioGroupFactory getNioGroupFactory(Settings settings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext,
};
try (NioHttpServerTransport transport = new NioHttpServerTransport(settings, networkService, bigArrays, pageRecycler, threadPool,
xContentRegistry(), dispatcher, new NioGroupFactory(settings, logger),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) {
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), RestRequest::request)) {
transport.start();
final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
try (NioHttpClient client = new NioHttpClient()) {
Expand Down Expand Up @@ -197,7 +197,7 @@ public void testBindUnavailableAddress() {
final Settings initialSettings = createSettings();
try (NioHttpServerTransport transport = new NioHttpServerTransport(initialSettings, networkService, bigArrays, pageRecycler,
threadPool, xContentRegistry(), new NullDispatcher(), new NioGroupFactory(Settings.EMPTY, logger),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) {
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), RestRequest::request)) {
transport.start();
TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
Settings settings = Settings.builder()
Expand All @@ -206,7 +206,7 @@ threadPool, xContentRegistry(), new NullDispatcher(), new NioGroupFactory(Settin
.build();
try (NioHttpServerTransport otherTransport = new NioHttpServerTransport(settings, networkService, bigArrays, pageRecycler,
threadPool, xContentRegistry(), new NullDispatcher(), new NioGroupFactory(Settings.EMPTY, logger),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) {
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), RestRequest::request)) {
BindHttpException bindHttpException = expectThrows(BindHttpException.class, () -> otherTransport.start());
assertEquals(
"Failed to bind to " + NetworkAddress.format(remoteAddress.address()),
Expand Down Expand Up @@ -243,7 +243,7 @@ public void dispatchBadRequest(final RestChannel channel,

try (NioHttpServerTransport transport = new NioHttpServerTransport(settings, networkService, bigArrays, pageRecycler,
threadPool, xContentRegistry(), dispatcher, new NioGroupFactory(settings, logger),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) {
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), RestRequest::request)) {
transport.start();
final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());

Expand Down Expand Up @@ -315,7 +315,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th

try (NioHttpServerTransport transport = new NioHttpServerTransport(settings, networkService, bigArrays, pageRecycler,
threadPool, xContentRegistry(), dispatcher, new NioGroupFactory(settings, logger),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) {
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), RestRequest::request)) {
transport.start();
final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());

Expand Down Expand Up @@ -365,7 +365,7 @@ public void dispatchBadRequest(final RestChannel channel,

try (NioHttpServerTransport transport = new NioHttpServerTransport(settings, networkService, bigArrays, pageRecycler,
threadPool, xContentRegistry(), dispatcher, new NioGroupFactory(settings, logger),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) {
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), RestRequest::request)) {
transport.start();
final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.rest.RestRequestFactory;
import org.elasticsearch.tasks.RawTaskStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -114,11 +115,12 @@ public NetworkModule(Settings settings, List<NetworkPlugin> plugins, ThreadPool
NamedWriteableRegistry namedWriteableRegistry,
NamedXContentRegistry xContentRegistry,
NetworkService networkService, HttpServerTransport.Dispatcher dispatcher,
ClusterSettings clusterSettings) {
ClusterSettings clusterSettings, RestRequestFactory restRequestFactory) {
this.settings = settings;
for (NetworkPlugin plugin : plugins) {
Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(settings, threadPool, bigArrays,
pageCacheRecycler, circuitBreakerService, xContentRegistry, networkService, dispatcher, clusterSettings);
pageCacheRecycler, circuitBreakerService, xContentRegistry, networkService, dispatcher, clusterSettings,
restRequestFactory);
for (Map.Entry<String, Supplier<HttpServerTransport>> entry : httpTransportFactory.entrySet()) {
registerHttpTransport(entry.getKey(), entry.getValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestRequestFactory;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BindTransportException;

Expand Down Expand Up @@ -75,6 +76,7 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
protected final ThreadPool threadPool;
protected final Dispatcher dispatcher;
protected final CorsHandler.Config corsConfig;
private RestRequestFactory restRequestFactory;
private final NamedXContentRegistry xContentRegistry;

protected final PortsRange port;
Expand All @@ -90,7 +92,8 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
private final HttpTracer tracer;

protected AbstractHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
NamedXContentRegistry xContentRegistry, Dispatcher dispatcher, ClusterSettings clusterSettings) {
NamedXContentRegistry xContentRegistry, Dispatcher dispatcher, ClusterSettings clusterSettings,
RestRequestFactory restRequestFactory) {
this.settings = settings;
this.networkService = networkService;
this.bigArrays = bigArrays;
Expand All @@ -99,6 +102,7 @@ protected AbstractHttpServerTransport(Settings settings, NetworkService networkS
this.dispatcher = dispatcher;
this.handlingSettings = HttpHandlingSettings.fromSettings(settings);
this.corsConfig = CorsHandler.fromSettings(settings);
this.restRequestFactory = restRequestFactory;

// we can't make the network.bind_host a fallback since we already fall back to http.host hence the extra conditional here
List<String> httpBindHost = SETTING_HTTP_BIND_HOST.get(settings);
Expand Down Expand Up @@ -334,7 +338,7 @@ private void handleIncomingRequest(final HttpRequest httpRequest, final HttpChan
{
RestRequest innerRestRequest;
try {
innerRestRequest = RestRequest.request(xContentRegistry, httpRequest, httpChannel);
innerRestRequest = restRequestFactory.createRestRequest(xContentRegistry, httpRequest, httpChannel);
} catch (final RestRequest.ContentTypeHeaderException e) {
badRequestCause = ExceptionsHelper.useOrSuppress(badRequestCause, e);
innerRestRequest = requestWithoutContentTypeHeader(httpRequest, httpChannel, badRequestCause);
Expand Down
Loading