Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -57,7 +58,7 @@ protected NodeRequest newNodeRequest(Request request) {
}

@Override
protected NodeResponse newNodeResponse(StreamInput in) throws IOException {
protected NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodeResponse(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -49,7 +50,7 @@ protected NodeRequest newNodeRequest(NodesHotThreadsRequest request) {
}

@Override
protected NodeHotThreads newNodeResponse(StreamInput in) throws IOException {
protected NodeHotThreads newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodeHotThreads(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -52,7 +53,7 @@ protected NodeInfoRequest newNodeRequest(NodesInfoRequest request) {
}

@Override
protected NodeInfo newNodeResponse(StreamInput in) throws IOException {
protected NodeInfo newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodeInfo(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ protected NodeRequest newNodeRequest(NodesReloadSecureSettingsRequest request) {
}

@Override
protected NodesReloadSecureSettingsResponse.NodeResponse newNodeResponse(StreamInput in) throws IOException {
protected NodesReloadSecureSettingsResponse.NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodesReloadSecureSettingsResponse.NodeResponse(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -54,7 +55,7 @@ protected NodeStatsRequest newNodeRequest(NodesStatsRequest request) {
}

@Override
protected NodeStats newNodeResponse(StreamInput in) throws IOException {
protected NodeStats newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodeStats(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -55,7 +56,7 @@ protected NodeUsageRequest newNodeRequest(NodesUsageRequest request) {
}

@Override
protected NodeUsage newNodeResponse(StreamInput in) throws IOException {
protected NodeUsage newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodeUsage(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ protected NodeRequest newNodeRequest(Request request) {
}

@Override
protected NodeSnapshotStatus newNodeResponse(StreamInput in) throws IOException {
protected NodeSnapshotStatus newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodeSnapshotStatus(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterStateHealth;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -119,7 +120,7 @@ protected ClusterStatsNodeRequest newNodeRequest(ClusterStatsRequest request) {
}

@Override
protected ClusterStatsNodeResponse newNodeResponse(StreamInput in) throws IOException {
protected ClusterStatsNodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new ClusterStatsNodeResponse(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ protected NodeFindDanglingIndexRequest newNodeRequest(FindDanglingIndexRequest r
}

@Override
protected NodeFindDanglingIndexResponse newNodeResponse(StreamInput in) throws IOException {
protected NodeFindDanglingIndexResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodeFindDanglingIndexResponse(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ protected NodeListDanglingIndicesRequest newNodeRequest(ListDanglingIndicesReque
}

@Override
protected NodeListDanglingIndicesResponse newNodeResponse(StreamInput in) throws IOException {
protected NodeListDanglingIndicesResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodeListDanglingIndicesResponse(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.transport.TransportResponse;

import java.io.IOException;
Expand All @@ -20,11 +21,34 @@
*/
public abstract class BaseNodeResponse extends TransportResponse {

private DiscoveryNode node;
private final DiscoveryNode node;

protected BaseNodeResponse(StreamInput in) throws IOException {
/**
* Read a response from the given stream, re-using the given {@link DiscoveryNode} object if non-null.
*
* On the wire a {@link BaseNodeResponse} message starts with a {@link DiscoveryNode} identifying the original responder. If the sender
* knows the identity of the responder already then we prefer to use that rather than reading the object from the wire, since {@link
* DiscoveryNode} objects are sometimes quite large and yet they're immutable so there's no need to have multiple copies in memory.
*
* @param node the expected remote node, or {@code null} if not known.
*/
protected BaseNodeResponse(StreamInput in, @Nullable DiscoveryNode node) throws IOException {
super(in);
node = new DiscoveryNode(in);
final DiscoveryNode remoteNode = new DiscoveryNode(in);
if (node == null) {
this.node = remoteNode;
} else {
assert remoteNode.equals(node) : remoteNode + " vs " + node;
this.node = node;
}
}

/**
* Read a response from the given stream, with no {@link DiscoveryNode} object re-use. Callers should not use this constructor if the
* local node is known, and instead should call {@link #BaseNodeResponse(StreamInput, DiscoveryNode)}.
*/
protected BaseNodeResponse(StreamInput in) throws IOException {
this(in, null);
}

protected BaseNodeResponse(DiscoveryNode node) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ protected void newResponseAsync(

protected abstract NodeRequest newNodeRequest(NodesRequest request);

protected abstract NodeResponse newNodeResponse(StreamInput in) throws IOException;
protected abstract NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException;

protected abstract NodeResponse nodeOperation(NodeRequest request, Task task);

Expand Down Expand Up @@ -218,7 +218,7 @@ void start() {
new TransportResponseHandler<NodeResponse>() {
@Override
public NodeResponse read(StreamInput in) throws IOException {
return newNodeResponse(in);
return newNodeResponse(in, node);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.transport.ReceiveTimeoutTransportException;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -34,6 +35,8 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;

import static java.util.Collections.emptySet;

Expand Down Expand Up @@ -282,6 +285,7 @@ void asyncFetch(final DiscoveryNode[] nodes, long fetchingRound) {
action.list(shardId, customDataPath, nodes, new ActionListener<BaseNodesResponse<T>>() {
@Override
public void onResponse(BaseNodesResponse<T> response) {
assert assertSameNodes(response);
processAsyncFetch(response.getNodes(), response.failures(), fetchingRound);
}

Expand All @@ -293,6 +297,17 @@ public void onFailure(Exception e) {
}
processAsyncFetch(null, failures, fetchingRound);
}

private boolean assertSameNodes(BaseNodesResponse<T> response) {
final Map<String, DiscoveryNode> nodesById
= Arrays.stream(nodes).collect(Collectors.toMap(DiscoveryNode::getEphemeralId, Function.identity()));
for (T nodeResponse : response.getNodes()) {
final DiscoveryNode responseNode = nodeResponse.getNode();
final DiscoveryNode localNode = nodesById.get(responseNode.getEphemeralId());
assert localNode == responseNode : "not reference equal: " + localNode + " vs " + responseNode;
}
return true;
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ protected NodeRequest newNodeRequest(Request request) {
}

@Override
protected NodeGatewayMetaState newNodeResponse(StreamInput in) throws IOException {
protected NodeGatewayMetaState newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodeGatewayMetaState(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,10 @@ protected NodeRequest newNodeRequest(Request request) {
}

@Override
protected NodeGatewayStartedShards newNodeResponse(StreamInput in) throws IOException {
return new NodeGatewayStartedShards(in);
protected NodeGatewayStartedShards newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
final NodeGatewayStartedShards response = new NodeGatewayStartedShards(in, node);
assert response.getNode() == node;
return response;
}

@Override
Expand Down Expand Up @@ -271,7 +273,11 @@ public static class NodeGatewayStartedShards extends BaseNodeResponse {
private final Exception storeException;

public NodeGatewayStartedShards(StreamInput in) throws IOException {
super(in);
this(in, null);
}

public NodeGatewayStartedShards(StreamInput in, DiscoveryNode node) throws IOException {
super(in, node);
allocationId = in.readOptionalString();
primary = in.readBoolean();
if (in.readBoolean()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ protected NodeRequest newNodeRequest(Request request) {
}

@Override
protected NodeStoreFilesMetadata newNodeResponse(StreamInput in) throws IOException {
return new NodeStoreFilesMetadata(in);
protected NodeStoreFilesMetadata newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
final NodeStoreFilesMetadata nodeStoreFilesMetadata = new NodeStoreFilesMetadata(in, node);
assert nodeStoreFilesMetadata.getNode() == node;
return nodeStoreFilesMetadata;
}

@Override
Expand Down Expand Up @@ -345,10 +347,10 @@ public String getCustomDataPath() {

public static class NodeStoreFilesMetadata extends BaseNodeResponse {

private StoreFilesMetadata storeFilesMetadata;
private final StoreFilesMetadata storeFilesMetadata;

public NodeStoreFilesMetadata(StreamInput in) throws IOException {
super(in);
public NodeStoreFilesMetadata(StreamInput in, DiscoveryNode node) throws IOException {
super(in, node);
storeFilesMetadata = new StoreFilesMetadata(in);
}

Expand All @@ -362,7 +364,7 @@ public StoreFilesMetadata storeFilesMetadata() {
}

public static NodeStoreFilesMetadata readListShardStoreNodeOperationResponse(StreamInput in) throws IOException {
return new NodeStoreFilesMetadata(in);
return new NodeStoreFilesMetadata(in, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ protected NodesResponse newResponse(NodesRequest request, List<NodeResponse> res
}

@Override
protected NodeResponse newNodeResponse(StreamInput in) throws IOException {
protected NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodeResponse(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ protected NodeRequest newNodeRequest(NodesRequest request) {
}

@Override
protected NodeResponse newNodeResponse(StreamInput in) throws IOException {
protected NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodeResponse(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -143,7 +144,7 @@ protected NodeRequest newNodeRequest(NodesRequest request) {
}

@Override
protected NodeResponse newNodeResponse(StreamInput in) throws IOException {
protected NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodeResponse(in);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ protected TestNodeRequest newNodeRequest(TestNodesRequest request) {
}

@Override
protected TestNodeResponse newNodeResponse(StreamInput in) throws IOException {
protected TestNodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new TestNodeResponse(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -65,7 +66,7 @@ protected AnalyticsStatsAction.NodeRequest newNodeRequest(AnalyticsStatsAction.R
}

@Override
protected AnalyticsStatsAction.NodeResponse newNodeResponse(StreamInput in) throws IOException {
protected AnalyticsStatsAction.NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new AnalyticsStatsAction.NodeResponse(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -57,7 +58,7 @@ protected NodesDeprecationCheckAction.NodeRequest newNodeRequest(NodesDeprecatio
}

@Override
protected NodesDeprecationCheckAction.NodeResponse newNodeResponse(StreamInput in) throws IOException {
protected NodesDeprecationCheckAction.NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodesDeprecationCheckAction.NodeResponse(in);
}

Expand Down
Loading