Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
Expand Down Expand Up @@ -76,7 +74,7 @@ public void testBasic() throws Exception {
// all shards
response = client().admin().indices().shardStores(Requests.indicesShardStoresRequest(index).shardStatuses("all")).get();
assertThat(response.getStoreStatuses().containsKey(index), equalTo(true));
ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> shardStores = response.getStoreStatuses().get(index);
Map<Integer, List<IndicesShardStoresResponse.StoreStatus>> shardStores = response.getStoreStatuses().get(index);
assertThat(shardStores.size(), equalTo(2));
for (Map.Entry<Integer, List<IndicesShardStoresResponse.StoreStatus>> shardStoreStatuses : shardStores.entrySet()) {
for (IndicesShardStoresResponse.StoreStatus storeStatus : shardStoreStatuses.getValue()) {
Expand All @@ -98,7 +96,7 @@ public void testBasic() throws Exception {
List<ShardRouting> unassignedShards = clusterState.routingTable().index(index).shardsWithState(ShardRoutingState.UNASSIGNED);
response = client().admin().indices().shardStores(Requests.indicesShardStoresRequest(index)).get();
assertThat(response.getStoreStatuses().containsKey(index), equalTo(true));
ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> shardStoresStatuses = response.getStoreStatuses().get(index);
Map<Integer, List<IndicesShardStoresResponse.StoreStatus>> shardStoresStatuses = response.getStoreStatuses().get(index);
assertThat(shardStoresStatuses.size(), equalTo(unassignedShards.size()));
for (Map.Entry<Integer, List<IndicesShardStoresResponse.StoreStatus>> storesStatus : shardStoresStatuses.entrySet()) {
assertThat("must report for one store", storesStatus.getValue().size(), equalTo(1));
Expand All @@ -125,8 +123,7 @@ public void testIndices() throws Exception {
.indices()
.shardStores(Requests.indicesShardStoresRequest().shardStatuses("all"))
.get();
ImmutableOpenMap<String, ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>>> shardStatuses = response
.getStoreStatuses();
Map<String, Map<Integer, List<IndicesShardStoresResponse.StoreStatus>>> shardStatuses = response.getStoreStatuses();
assertThat(shardStatuses.containsKey(index1), equalTo(true));
assertThat(shardStatuses.containsKey(index2), equalTo(true));
assertThat(shardStatuses.get(index1).size(), equalTo(2));
Expand Down Expand Up @@ -181,7 +178,7 @@ public void testCorruptedShards() throws Exception {

assertBusy(() -> { // IndicesClusterStateService#failAndRemoveShard() called asynchronously but we need it to have completed here.
IndicesShardStoresResponse rsp = client().admin().indices().prepareShardStores(index).setShardStatuses("all").get();
ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> shardStatuses = rsp.getStoreStatuses().get(index);
Map<Integer, List<IndicesShardStoresResponse.StoreStatus>> shardStatuses = rsp.getStoreStatuses().get(index);
assertNotNull(shardStatuses);
assertThat(shardStatuses.size(), greaterThan(0));
for (Map.Entry<Integer, List<IndicesShardStoresResponse.StoreStatus>> shardStatus : shardStatuses.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.gateway.GatewayAllocator;
Expand Down Expand Up @@ -286,7 +285,7 @@ public void testForceStaleReplicaToBePromotedToPrimary() throws Exception {
boolean useStaleReplica = randomBoolean(); // if true, use stale replica, otherwise a completely empty copy
logger.info("--> explicitly promote old primary shard");
final String idxName = "test";
ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> storeStatuses = client().admin()
Map<Integer, List<IndicesShardStoresResponse.StoreStatus>> storeStatuses = client().admin()
.indices()
.prepareShardStores(idxName)
.get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -110,12 +108,11 @@ private void verifyThenSubmitUpdate(
IndicesShardStoresAction.NAME,
new IndicesShardStoresRequest().indices(stalePrimaryAllocations.keySet().toArray(Strings.EMPTY_ARRAY)),
new ActionListenerResponseHandler<>(ActionListener.wrap(response -> {
ImmutableOpenMap<String, ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>>> status = response
.getStoreStatuses();
Map<String, Map<Integer, List<IndicesShardStoresResponse.StoreStatus>>> status = response.getStoreStatuses();
Exception e = null;
for (Map.Entry<String, List<AbstractAllocateAllocationCommand>> entry : stalePrimaryAllocations.entrySet()) {
final String index = entry.getKey();
final ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> indexStatus = status.get(index);
final Map<Integer, List<IndicesShardStoresResponse.StoreStatus>> indexStatus = status.get(index);
if (indexStatus == null) {
// The index in the stale primary allocation request was green and hence filtered out by the store status
// request. We ignore it here since the relevant exception will be thrown by the reroute action later on.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,10 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t
}
}

private final ImmutableOpenMap<String, ImmutableOpenIntMap<List<StoreStatus>>> storeStatuses;
private final Map<String, Map<Integer, List<StoreStatus>>> storeStatuses;
private final List<Failure> failures;

public IndicesShardStoresResponse(
ImmutableOpenMap<String, ImmutableOpenIntMap<List<StoreStatus>>> storeStatuses,
List<Failure> failures
) {
public IndicesShardStoresResponse(Map<String, Map<Integer, List<StoreStatus>>> storeStatuses, List<Failure> failures) {
this.storeStatuses = storeStatuses;
this.failures = failures;
}
Expand All @@ -264,7 +261,7 @@ public IndicesShardStoresResponse(StreamInput in) throws IOException {
* Returns {@link StoreStatus}s
* grouped by their index names and shard ids.
*/
public ImmutableOpenMap<String, ImmutableOpenIntMap<List<StoreStatus>>> getStoreStatuses() {
public Map<String, Map<Integer, List<StoreStatus>>> getStoreStatuses() {
return storeStatuses;
}

Expand Down Expand Up @@ -299,7 +296,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}

builder.startObject(Fields.INDICES);
for (Map.Entry<String, ImmutableOpenIntMap<List<StoreStatus>>> indexShards : storeStatuses.entrySet()) {
for (Map.Entry<String, Map<Integer, List<StoreStatus>>> indexShards : storeStatuses.entrySet()) {
builder.startObject(indexShards.getKey());

builder.startObject(Fields.SHARDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse.Failure;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse.StoreStatus;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse.StoreStatus.AllocationStatus;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
Expand All @@ -31,8 +34,6 @@
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.core.Tuple;
Expand All @@ -47,8 +48,10 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand Down Expand Up @@ -206,81 +209,50 @@ protected synchronized void processAsyncFetch(
}

void finish() {
ImmutableOpenMap.Builder<
String,
ImmutableOpenIntMap<java.util.List<IndicesShardStoresResponse.StoreStatus>>> indicesStoreStatusesBuilder =
ImmutableOpenMap.builder();

java.util.List<IndicesShardStoresResponse.Failure> failureBuilder = new ArrayList<>();
Map<String, Map<Integer, List<StoreStatus>>> indicesStatuses = new HashMap<>();
List<Failure> failures = new ArrayList<>();
for (Response fetchResponse : fetchResponses) {
ImmutableOpenIntMap<java.util.List<IndicesShardStoresResponse.StoreStatus>> indexStoreStatuses =
indicesStoreStatusesBuilder.get(fetchResponse.shardId.getIndexName());
final ImmutableOpenIntMap.Builder<java.util.List<IndicesShardStoresResponse.StoreStatus>> indexShardsBuilder;
if (indexStoreStatuses == null) {
indexShardsBuilder = ImmutableOpenIntMap.builder();
} else {
indexShardsBuilder = ImmutableOpenIntMap.builder(indexStoreStatuses);
}
java.util.List<IndicesShardStoresResponse.StoreStatus> storeStatuses = indexShardsBuilder.get(
fetchResponse.shardId.id()
);
if (storeStatuses == null) {
storeStatuses = new ArrayList<>();
}
for (NodeGatewayStartedShards response : fetchResponse.responses) {
if (shardExistsInNode(response)) {
IndicesShardStoresResponse.StoreStatus.AllocationStatus allocationStatus = getAllocationStatus(
fetchResponse.shardId.getIndexName(),
fetchResponse.shardId.id(),
response.getNode()
);
storeStatuses.add(
new IndicesShardStoresResponse.StoreStatus(
response.getNode(),
response.allocationId(),
allocationStatus,
response.storeException()
)
);
var indexName = fetchResponse.shardId.getIndexName();
var shardId = fetchResponse.shardId.id();
var indexStatuses = indicesStatuses.computeIfAbsent(indexName, k -> new HashMap<>());
var storeStatuses = indexStatuses.computeIfAbsent(shardId, k -> new ArrayList<>());

for (NodeGatewayStartedShards r : fetchResponse.responses) {
if (shardExistsInNode(r)) {
var allocationStatus = getAllocationStatus(indexName, shardId, r.getNode());
storeStatuses.add(new StoreStatus(r.getNode(), r.allocationId(), allocationStatus, r.storeException()));
}
}
CollectionUtil.timSort(storeStatuses);
indexShardsBuilder.put(fetchResponse.shardId.id(), storeStatuses);
indicesStoreStatusesBuilder.put(fetchResponse.shardId.getIndexName(), indexShardsBuilder.build());

for (FailedNodeException failure : fetchResponse.failures) {
failureBuilder.add(
new IndicesShardStoresResponse.Failure(
failure.nodeId(),
fetchResponse.shardId.getIndexName(),
fetchResponse.shardId.id(),
failure.getCause()
)
);
failures.add(new Failure(failure.nodeId(), indexName, shardId, failure.getCause()));
}
}
listener.onResponse(
new IndicesShardStoresResponse(indicesStoreStatusesBuilder.build(), Collections.unmodifiableList(failureBuilder))
);
// make the status structure immutable
indicesStatuses.replaceAll((k, v) -> {
v.replaceAll((s, l) -> {
CollectionUtil.timSort(l);
return List.copyOf(l);
});
return Map.copyOf(v);
});
listener.onResponse(new IndicesShardStoresResponse(Map.copyOf(indicesStatuses), List.copyOf(failures)));
}

private IndicesShardStoresResponse.StoreStatus.AllocationStatus getAllocationStatus(
String index,
int shardID,
DiscoveryNode node
) {
private AllocationStatus getAllocationStatus(String index, int shardID, DiscoveryNode node) {
for (ShardRouting shardRouting : routingNodes.node(node.getId())) {
ShardId shardId = shardRouting.shardId();
if (shardId.id() == shardID && shardId.getIndexName().equals(index)) {
if (shardRouting.primary()) {
return IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY;
return AllocationStatus.PRIMARY;
} else if (shardRouting.assignedToNode()) {
return IndicesShardStoresResponse.StoreStatus.AllocationStatus.REPLICA;
return AllocationStatus.REPLICA;
} else {
return IndicesShardStoresResponse.StoreStatus.AllocationStatus.UNUSED;
return AllocationStatus.UNUSED;
}
}
}
return IndicesShardStoresResponse.StoreStatus.AllocationStatus.UNUSED;
return AllocationStatus.UNUSED;
}

/**
Expand Down
Loading