diff --git a/core/src/main/java/org/elasticsearch/action/ActionRequest.java b/core/src/main/java/org/elasticsearch/action/ActionRequest.java index 769b2e7b5738c..f5f10c7bcfa9d 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionRequest.java +++ b/core/src/main/java/org/elasticsearch/action/ActionRequest.java @@ -34,6 +34,10 @@ public ActionRequest() { // this.listenerThreaded = request.listenerThreaded(); } + public ActionRequest(StreamInput in) throws IOException { + super(in); + } + public abstract ActionRequestValidationException validate(); /** diff --git a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java index 0330bc1fbe88e..39d7f96e8e6f6 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.ingest.ConfigurationUtils; @@ -68,6 +69,18 @@ public SimulatePipelineRequest(BytesReference source, XContentType xContentType) SimulatePipelineRequest() { } + SimulatePipelineRequest(StreamInput in) throws IOException { + super(in); + id = in.readOptionalString(); + verbose = in.readBoolean(); + source = in.readBytesReference(); + if (in.getVersion().onOrAfter(Version.V_5_3_0)) { + xContentType = XContentType.readFrom(in); + } else { + xContentType = XContentFactory.xContentType(source); + } + } + @Override public ActionRequestValidationException validate() { return null; @@ -99,15 +112,7 @@ public XContentType getXContentType() { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - id = in.readOptionalString(); - verbose = in.readBoolean(); - source = in.readBytesReference(); - if (in.getVersion().onOrAfter(Version.V_5_3_0)) { - xContentType = XContentType.readFrom(in); - } else { - xContentType = XContentFactory.xContentType(source); - } + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java index 3f67007df690d..d660840e9b794 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java @@ -40,7 +40,7 @@ public class SimulatePipelineTransportAction extends HandledTransportAction requestReader, + IndexNameExpressionResolver indexNameExpressionResolver) { + this(settings, actionName, true, threadPool, transportService, actionFilters, indexNameExpressionResolver, requestReader); + } + protected HandledTransportAction(Settings settings, String actionName, boolean canTripCircuitBreaker, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier request) { @@ -51,6 +58,14 @@ protected HandledTransportAction(Settings settings, String actionName, boolean c new TransportHandler()); } + protected HandledTransportAction(Settings settings, String actionName, boolean canTripCircuitBreaker, ThreadPool threadPool, + TransportService transportService, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader requestReader) { + super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager()); + transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, false, canTripCircuitBreaker, requestReader, + new TransportHandler()); + } + class TransportHandler implements TransportRequestHandler { @Override diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/Streamable.java b/core/src/main/java/org/elasticsearch/common/io/stream/Streamable.java index 99c054c4c7810..86a4d3ed95c2f 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/Streamable.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/Streamable.java @@ -20,6 +20,7 @@ package org.elasticsearch.common.io.stream; import java.io.IOException; +import java.util.function.Supplier; /** * Implementers can be written to a {@linkplain StreamOutput} and read from a {@linkplain StreamInput}. This allows them to be "thrown @@ -43,4 +44,12 @@ public interface Streamable { * Write this object's fields to a {@linkplain StreamOutput}. */ void writeTo(StreamOutput out) throws IOException; + + static Writeable.Reader newWriteableReader(Supplier supplier) { + return (StreamInput in) -> { + T request = supplier.get(); + request.readFrom(in); + return request; + }; + } } diff --git a/core/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java b/core/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java index 2e56ff91021a6..fc1af1d876ade 100644 --- a/core/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java +++ b/core/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java @@ -19,6 +19,8 @@ package org.elasticsearch.transport; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskManager; @@ -32,15 +34,14 @@ public class RequestHandlerRegistry { private final boolean forceExecution; private final boolean canTripCircuitBreaker; private final String executor; - private final Supplier requestFactory; private final TaskManager taskManager; + private final Writeable.Reader requestReader; - public RequestHandlerRegistry(String action, Supplier requestFactory, TaskManager taskManager, + public RequestHandlerRegistry(String action, Writeable.Reader requestReader, TaskManager taskManager, TransportRequestHandler handler, String executor, boolean forceExecution, boolean canTripCircuitBreaker) { this.action = action; - this.requestFactory = requestFactory; - assert newRequest() != null; + this.requestReader = requestReader; this.handler = handler; this.forceExecution = forceExecution; this.canTripCircuitBreaker = canTripCircuitBreaker; @@ -52,8 +53,8 @@ public String getAction() { return action; } - public Request newRequest() { - return requestFactory.get(); + public Request newRequest(StreamInput in) throws IOException { + return requestReader.read(in); } public void processMessageReceived(Request request, TransportChannel channel) throws Exception { diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index 689a54fc8daa5..8ff86e593c068 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -1475,9 +1475,8 @@ protected String handleRequest(Channel channel, String profileName, final Stream } transportChannel = new TcpTransportChannel<>(this, channel, transportName, action, requestId, version, profileName, messageLengthBytes); - final TransportRequest request = reg.newRequest(); + final TransportRequest request = reg.newRequest(stream); request.remoteAddress(new TransportAddress(remoteAddress)); - request.readFrom(stream); // in case we throw an exception, i.e. when the limit is hit, we don't want to verify validateRequest(stream, requestId, action); threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel)); diff --git a/core/src/main/java/org/elasticsearch/transport/TransportActionProxy.java b/core/src/main/java/org/elasticsearch/transport/TransportActionProxy.java index 5259fca507e49..e08d89d181f71 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportActionProxy.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportActionProxy.java @@ -22,6 +22,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; @@ -97,11 +98,11 @@ public String executor() { static class ProxyRequest extends TransportRequest { T wrapped; - Supplier supplier; + Writeable.Reader reader; DiscoveryNode targetNode; - ProxyRequest(Supplier supplier) { - this.supplier = supplier; + ProxyRequest(Writeable.Reader reader) { + this.reader = reader; } ProxyRequest(T wrapped, DiscoveryNode targetNode) { @@ -113,8 +114,7 @@ static class ProxyRequest extends TransportRequest { public void readFrom(StreamInput in) throws IOException { super.readFrom(in); targetNode = new DiscoveryNode(in); - wrapped = supplier.get(); - wrapped.readFrom(in); + wrapped = reader.read(in); } @Override diff --git a/core/src/main/java/org/elasticsearch/transport/TransportMessage.java b/core/src/main/java/org/elasticsearch/transport/TransportMessage.java index fa21a51ba2d3f..ecaca73b2db57 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportMessage.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportMessage.java @@ -22,11 +22,12 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.transport.TransportAddress; import java.io.IOException; -public abstract class TransportMessage implements Streamable { +public abstract class TransportMessage implements Streamable, Writeable { private TransportAddress remoteAddress; diff --git a/core/src/main/java/org/elasticsearch/transport/TransportRequest.java b/core/src/main/java/org/elasticsearch/transport/TransportRequest.java index c42ec24ad15a6..d6072fc9d0aa5 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportRequest.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportRequest.java @@ -39,6 +39,10 @@ public static class Empty extends TransportRequest { public TransportRequest() { } + public TransportRequest(StreamInput in) throws IOException { + parentTaskId = TaskId.readFromStream(in); + } + /** * Set a reference to task that created this request. */ diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index 13034355366cf..a68e319bb2c11 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -32,6 +32,8 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.regex.Regex; @@ -709,7 +711,24 @@ public void registerRequestHandler(String act String executor, TransportRequestHandler handler) { handler = interceptor.interceptHandler(action, executor, false, handler); RequestHandlerRegistry reg = new RequestHandlerRegistry<>( - action, requestFactory, taskManager, handler, executor, false, true); + action, Streamable.newWriteableReader(requestFactory), taskManager, handler, executor, false, true); + registerRequestHandler(reg); + } + + /** + * Registers a new request handler + * + * @param action The action the request handler is associated with + * @param requestReader a callable to be used construct new instances for streaming + * @param executor The executor the request handling will be executed on + * @param handler The handler itself that implements the request handling + */ + public void registerRequestHandler(String action, String executor, + Writeable.Reader requestReader, + TransportRequestHandler handler) { + handler = interceptor.interceptHandler(action, executor, false, handler); + RequestHandlerRegistry reg = new RequestHandlerRegistry<>( + action, requestReader, taskManager, handler, executor, false, true); registerRequestHandler(reg); } @@ -729,7 +748,28 @@ public void registerRequestHandler(String act TransportRequestHandler handler) { handler = interceptor.interceptHandler(action, executor, forceExecution, handler); RequestHandlerRegistry reg = new RequestHandlerRegistry<>( - action, request, taskManager, handler, executor, forceExecution, canTripCircuitBreaker); + action, Streamable.newWriteableReader(request), taskManager, handler, executor, forceExecution, canTripCircuitBreaker); + registerRequestHandler(reg); + } + + /** + * Registers a new request handler + * + * @param action The action the request handler is associated with + * @param requestReader The request class that will be used to construct new instances for streaming + * @param executor The executor the request handling will be executed on + * @param forceExecution Force execution on the executor queue and never reject it + * @param canTripCircuitBreaker Check the request size and raise an exception in case the limit is breached. + * @param handler The handler itself that implements the request handling + */ + public void registerRequestHandler(String action, + String executor, boolean forceExecution, + boolean canTripCircuitBreaker, + Writeable.Reader requestReader, + TransportRequestHandler handler) { + handler = interceptor.interceptHandler(action, executor, forceExecution, handler); + RequestHandlerRegistry reg = new RequestHandlerRegistry<>( + action, requestReader, taskManager, handler, executor, forceExecution, canTripCircuitBreaker); registerRequestHandler(reg); } diff --git a/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestTests.java b/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestTests.java index ecd0256b11068..5cd82be8cb04c 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestTests.java @@ -49,8 +49,7 @@ public void testSerialization() throws IOException { BytesStreamOutput out = new BytesStreamOutput(); request.writeTo(out); StreamInput streamInput = out.bytes().streamInput(); - SimulatePipelineRequest otherRequest = new SimulatePipelineRequest(); - otherRequest.readFrom(streamInput); + SimulatePipelineRequest otherRequest = new SimulatePipelineRequest(streamInput); assertThat(otherRequest.getId(), equalTo(request.getId())); assertThat(otherRequest.isVerbose(), equalTo(request.isVerbose())); @@ -65,8 +64,7 @@ public void testSerializationWithXContent() throws IOException { request.writeTo(output); StreamInput in = StreamInput.wrap(output.bytes().toBytesRef().bytes); - SimulatePipelineRequest serialized = new SimulatePipelineRequest(); - serialized.readFrom(in); + SimulatePipelineRequest serialized = new SimulatePipelineRequest(in); assertEquals(XContentType.JSON, serialized.getXContentType()); assertEquals("{}", serialized.getSource().utf8ToString()); } @@ -77,8 +75,7 @@ public void testSerializationWithXContentBwc() throws IOException { Version.V_5_1_1, Version.V_5_1_2, Version.V_5_2_0); try (StreamInput in = StreamInput.wrap(data)) { in.setVersion(version); - SimulatePipelineRequest request = new SimulatePipelineRequest(); - request.readFrom(in); + SimulatePipelineRequest request = new SimulatePipelineRequest(in); assertEquals(XContentType.JSON, request.getXContentType()); assertEquals("{}", request.getSource().utf8ToString()); diff --git a/core/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java b/core/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java index e73ad8e439cb8..64f4182550935 100644 --- a/core/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java +++ b/core/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java @@ -267,7 +267,7 @@ public void testIsProxyAction() { } public void testIsProxyRequest() { - assertTrue(TransportActionProxy.isProxyRequest(new TransportActionProxy.ProxyRequest<>(() -> null))); + assertTrue(TransportActionProxy.isProxyRequest(new TransportActionProxy.ProxyRequest<>((in) -> null))); assertFalse(TransportActionProxy.isProxyRequest(TransportRequest.Empty.INSTANCE)); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 98736a7a98ec1..503a7ae1f79d9 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -419,8 +419,7 @@ protected void sendRequest(Connection connection, long requestId, String action, RequestHandlerRegistry reg = MockTransportService.this.getRequestHandler(action); BytesStreamOutput bStream = new BytesStreamOutput(); request.writeTo(bStream); - final TransportRequest clonedRequest = reg.newRequest(); - clonedRequest.readFrom(bStream.bytes().streamInput()); + final TransportRequest clonedRequest = reg.newRequest(bStream.bytes().streamInput()); Runnable runnable = new AbstractRunnable() { AtomicBoolean requestSent = new AtomicBoolean();