Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# There are many more tests under modules/lang-painless/...moving_fn.yml so they can use painless
---
"Bad window":

- skip:
version: " - 7.1.99"
reason: "calendar_interval added in 7.2"

- do:
catch: /\[window\] must be a positive, non-zero integer\./
search:
rest_total_hits_as_int: true
body:
size: 0
aggs:
the_histo:
date_histogram:
field: "date"
calendar_interval: "1d"
aggs:
the_avg:
avg:
field: "value_field"
the_mov_fn:
moving_fn:
buckets_path: "the_avg"
window: -1
script: "MovingFunctions.max(values)"

---
"Not under date_histo":

- do:
catch: /moving_fn aggregation \[the_mov_fn\] must have a histogram, date_histogram or auto_date_histogram as parent but doesn't have a parent/
search:
rest_total_hits_as_int: true
body:
size: 0
aggs:
the_avg:
avg:
field: "value_field"
the_mov_fn:
moving_fn:
buckets_path: "the_avg"
window: 1
script: "MovingFunctions.max(values)"
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
*/
public class ActionListenerResponseHandler<Response extends TransportResponse> implements TransportResponseHandler<Response> {

private final ActionListener<? super Response> listener;
protected final ActionListener<? super Response> listener;
private final Writeable.Reader<Response> reader;
private final String executor;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.action.resync;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationResponse;
Expand All @@ -31,8 +32,6 @@
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
Expand Down Expand Up @@ -191,11 +190,7 @@ public void sync(
new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm),
parentTask,
transportOptions,
new TransportResponseHandler<ResyncReplicationResponse>() {
@Override
public ResyncReplicationResponse read(StreamInput in) throws IOException {
return newResponseInstance(in);
}
new ActionListenerResponseHandler<>(listener, TransportResyncReplicationAction.this::newResponseInstance) {

@Override
public void handleResponse(ResyncReplicationResponse response) {
Expand All @@ -210,11 +205,6 @@ public void handleResponse(ResyncReplicationResponse response) {
}
listener.onResponse(response);
}

@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.action.support.single.instance;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.support.ActionFilters;
Expand Down Expand Up @@ -38,7 +39,6 @@
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
Expand Down Expand Up @@ -184,29 +184,24 @@ protected void doStart(ClusterState clusterState) {

request.shardId = shardIt.shardId();
DiscoveryNode node = clusterState.nodes().get(shard.currentNodeId());
transportService.sendRequest(node, shardActionName, request, transportOptions(), new TransportResponseHandler<Response>() {

@Override
public Response read(StreamInput in) throws IOException {
return newResponse(in);
}

@Override
public void handleResponse(Response response) {
listener.onResponse(response);
}

@Override
public void handleException(TransportException exp) {
final Throwable cause = exp.unwrapCause();
// if we got disconnected from the node, or the node / shard is not in the right state (being closed)
if (cause instanceof ConnectTransportException || cause instanceof NodeClosedException || retryOnFailure(exp)) {
retry((Exception) cause);
} else {
listener.onFailure(exp);
transportService.sendRequest(
node,
shardActionName,
request,
transportOptions(),
new ActionListenerResponseHandler<>(listener, TransportInstanceSingleOperationAction.this::newResponse) {
@Override
public void handleException(TransportException exp) {
final Throwable cause = exp.unwrapCause();
// if we got disconnected from the node, or the node / shard is not in the right state (being closed)
if (cause instanceof ConnectTransportException || cause instanceof NodeClosedException || retryOnFailure(exp)) {
retry((Exception) cause);
} else {
listener.onFailure(exp);
}
}
}
});
);
}

void retry(@Nullable final Exception failure) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ResultDeduplicator;
import org.elasticsearch.cluster.ClusterChangedEvent;
Expand All @@ -24,7 +25,6 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
Expand All @@ -47,8 +47,6 @@
import org.elasticsearch.repositories.ShardSnapshotResult;
import org.elasticsearch.repositories.SnapshotShardContext;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
Expand Down Expand Up @@ -530,22 +528,7 @@ public void onFailure(Exception e) {
transportService.getLocalNode(),
SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME,
req,
new TransportResponseHandler<ActionResponse.Empty>() {
@Override
public ActionResponse.Empty read(StreamInput in) {
return ActionResponse.Empty.INSTANCE;
}

@Override
public void handleResponse(ActionResponse.Empty response) {
reqListener.onResponse(null);
}

@Override
public void handleException(TransportException exp) {
reqListener.onFailure(exp);
}
}
new ActionListenerResponseHandler<>(reqListener.map(res -> null), in -> ActionResponse.Empty.INSTANCE)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -100,14 +102,13 @@ void sendRequest(
assert false : "request [" + request + "] has been released already";
throw new AlreadyClosedException("request [" + request + "] has been released already");
}
ActionListener<Void> listener = ActionListener.wrap(() -> {
sendMessage(channel, message, () -> {
try {
messageListener.onRequestSent(node, requestId, action, request, options);
} finally {
request.decRef();
}
});
sendMessage(channel, message, listener);
}

/**
Expand All @@ -134,14 +135,13 @@ void sendResponse(
isHandshake,
compressionScheme
);
ActionListener<Void> listener = ActionListener.wrap(() -> {
sendMessage(channel, message, () -> {
try {
messageListener.onResponseSent(requestId, action, response);
} finally {
response.decRef();
}
});
sendMessage(channel, message, listener);
}

/**
Expand All @@ -157,22 +157,35 @@ void sendErrorResponse(
Version version = Version.min(this.version, nodeVersion);
RemoteTransportException tx = new RemoteTransportException(nodeName, channel.getLocalAddress(), action, error);
OutboundMessage.Response message = new OutboundMessage.Response(threadPool.getThreadContext(), tx, version, requestId, false, null);
ActionListener<Void> listener = ActionListener.wrap(() -> messageListener.onResponseSent(requestId, action, error));
sendMessage(channel, message, listener);
sendMessage(channel, message, () -> messageListener.onResponseSent(requestId, action, error));
}

private void sendMessage(TcpChannel channel, OutboundMessage networkMessage, ActionListener<Void> listener) throws IOException {
final RecyclerBytesStreamOutput byteStreamOutput = new RecyclerBytesStreamOutput(recycler);
final ActionListener<Void> wrappedListener = ActionListener.runBefore(listener, byteStreamOutput::close);
private void sendMessage(TcpChannel channel, OutboundMessage networkMessage, Releasable onAfter) throws IOException {
final RecyclerBytesStreamOutput byteStreamOutput;
boolean bufferSuccess = false;
try {
byteStreamOutput = new RecyclerBytesStreamOutput(recycler);
bufferSuccess = true;
} finally {
if (bufferSuccess == false) {
Releasables.closeExpectNoException(onAfter);
}
}
final Releasable release = Releasables.wrap(byteStreamOutput, onAfter);
final BytesReference message;
boolean serializeSuccess = false;
try {
message = networkMessage.serialize(byteStreamOutput);
serializeSuccess = true;
} catch (Exception e) {
logger.warn(() -> "failed to serialize outbound message [" + networkMessage + "]", e);
wrappedListener.onFailure(e);
throw e;
} finally {
if (serializeSuccess == false) {
release.close();
}
}
internalSend(channel, message, networkMessage, wrappedListener);
internalSend(channel, message, networkMessage, ActionListener.wrap(release::close));
}

private void internalSend(
Expand Down Expand Up @@ -230,8 +243,7 @@ private void maybeLogSlowMessage(boolean success) {
}
});
} catch (RuntimeException ex) {
listener.onFailure(ex);
CloseableChannel.closeChannel(channel);
Releasables.closeExpectNoException(() -> listener.onFailure(ex), () -> CloseableChannel.closeChannel(channel));
throw ex;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@
package org.elasticsearch.transport;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.IOUtils;
Expand Down Expand Up @@ -122,24 +121,10 @@ void collectNodes(ActionListener<Function<String, DiscoveryNode>> listener) {
ClusterStateAction.NAME,
request,
TransportRequestOptions.EMPTY,
new TransportResponseHandler<ClusterStateResponse>() {

@Override
public ClusterStateResponse read(StreamInput in) throws IOException {
return new ClusterStateResponse(in);
}

@Override
public void handleResponse(ClusterStateResponse response) {
DiscoveryNodes nodes = response.getState().nodes();
contextPreservingActionListener.onResponse(nodes::get);
}

@Override
public void handleException(TransportException exp) {
contextPreservingActionListener.onFailure(exp);
}
}
new ActionListenerResponseHandler<>(
contextPreservingActionListener.map(response -> response.getState().nodes()::get),
ClusterStateResponse::new
)
);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ protected QB createQueryWithInnerQuery(QueryBuilder queryBuilder) {
throw new UnsupportedOperationException();
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/90984")
public void testMaxNestedDepth() throws IOException {
QB query = null;
try {
Expand Down
Loading