Skip to content

Commit 0752df0

Browse files
alexshadow007ywelsch
authored andcommitted
Add wait_for_active_shards parameter to index open command (#26682)
Adds the wait_for_active_shards parameter to the index open command. Similar to the index creation command, the index open command will now, by default, wait until the primaries have been allocated. Closes #20937
1 parent a1d049c commit 0752df0

File tree

17 files changed

+284
-41
lines changed

17 files changed

+284
-41
lines changed

core/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexClusterStateUpdateRequest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,26 @@
1818
*/
1919
package org.elasticsearch.action.admin.indices.open;
2020

21+
import org.elasticsearch.action.support.ActiveShardCount;
2122
import org.elasticsearch.cluster.ack.IndicesClusterStateUpdateRequest;
2223

2324
/**
2425
* Cluster state update request that allows to open one or more indices
2526
*/
2627
public class OpenIndexClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest<OpenIndexClusterStateUpdateRequest> {
2728

29+
private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;
30+
2831
OpenIndexClusterStateUpdateRequest() {
2932

3033
}
34+
35+
public ActiveShardCount waitForActiveShards() {
36+
return waitForActiveShards;
37+
}
38+
39+
public OpenIndexClusterStateUpdateRequest waitForActiveShards(ActiveShardCount waitForActiveShards) {
40+
this.waitForActiveShards = waitForActiveShards;
41+
return this;
42+
}
3143
}

core/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexRequest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919

2020
package org.elasticsearch.action.admin.indices.open;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.action.ActionRequestValidationException;
2324
import org.elasticsearch.action.IndicesRequest;
25+
import org.elasticsearch.action.support.ActiveShardCount;
2426
import org.elasticsearch.action.support.IndicesOptions;
2527
import org.elasticsearch.action.support.master.AcknowledgedRequest;
2628
import org.elasticsearch.common.io.stream.StreamInput;
@@ -38,6 +40,7 @@ public class OpenIndexRequest extends AcknowledgedRequest<OpenIndexRequest> impl
3840

3941
private String[] indices;
4042
private IndicesOptions indicesOptions = IndicesOptions.fromOptions(false, true, false, true);
43+
private ActiveShardCount waitForActiveShards = ActiveShardCount.NONE;
4144

4245
public OpenIndexRequest() {
4346
}
@@ -101,17 +104,55 @@ public OpenIndexRequest indicesOptions(IndicesOptions indicesOptions) {
101104
return this;
102105
}
103106

107+
public ActiveShardCount waitForActiveShards() {
108+
return waitForActiveShards;
109+
}
110+
111+
/**
112+
* Sets the number of shard copies that should be active for indices opening to return.
113+
* Defaults to {@link ActiveShardCount#DEFAULT}, which will wait for one shard copy
114+
* (the primary) to become active. Set this value to {@link ActiveShardCount#ALL} to
115+
* wait for all shards (primary and all replicas) to be active before returning.
116+
* Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any
117+
* non-negative integer, up to the number of copies per shard (number of replicas + 1),
118+
* to wait for the desired amount of shard copies to become active before returning.
119+
* Indices opening will only wait up until the timeout value for the number of shard copies
120+
* to be active before returning. Check {@link OpenIndexResponse#isShardsAcknowledged()} to
121+
* determine if the requisite shard copies were all started before returning or timing out.
122+
*
123+
* @param waitForActiveShards number of active shard copies to wait on
124+
*/
125+
public OpenIndexRequest waitForActiveShards(ActiveShardCount waitForActiveShards) {
126+
this.waitForActiveShards = waitForActiveShards;
127+
return this;
128+
}
129+
130+
/**
131+
* A shortcut for {@link #waitForActiveShards(ActiveShardCount)} where the numerical
132+
* shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
133+
* to get the ActiveShardCount.
134+
*/
135+
public OpenIndexRequest waitForActiveShards(final int waitForActiveShards) {
136+
return waitForActiveShards(ActiveShardCount.from(waitForActiveShards));
137+
}
138+
104139
@Override
105140
public void readFrom(StreamInput in) throws IOException {
106141
super.readFrom(in);
107142
indices = in.readStringArray();
108143
indicesOptions = IndicesOptions.readIndicesOptions(in);
144+
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
145+
waitForActiveShards = ActiveShardCount.readFrom(in);
146+
}
109147
}
110148

111149
@Override
112150
public void writeTo(StreamOutput out) throws IOException {
113151
super.writeTo(out);
114152
out.writeStringArray(indices);
115153
indicesOptions.writeIndicesOptions(out);
154+
if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
155+
waitForActiveShards.writeTo(out);
156+
}
116157
}
117158
}

core/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexRequestBuilder.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.action.admin.indices.open;
2121

22+
import org.elasticsearch.action.support.ActiveShardCount;
2223
import org.elasticsearch.action.support.IndicesOptions;
2324
import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
2425
import org.elasticsearch.client.ElasticsearchClient;
@@ -58,4 +59,32 @@ public OpenIndexRequestBuilder setIndicesOptions(IndicesOptions indicesOptions)
5859
request.indicesOptions(indicesOptions);
5960
return this;
6061
}
62+
63+
/**
64+
* Sets the number of shard copies that should be active for indices opening to return.
65+
* Defaults to {@link ActiveShardCount#DEFAULT}, which will wait for one shard copy
66+
* (the primary) to become active. Set this value to {@link ActiveShardCount#ALL} to
67+
* wait for all shards (primary and all replicas) to be active before returning.
68+
* Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any
69+
* non-negative integer, up to the number of copies per shard (number of replicas + 1),
70+
* to wait for the desired amount of shard copies to become active before returning.
71+
* Indices opening will only wait up until the timeout value for the number of shard copies
72+
* to be active before returning. Check {@link OpenIndexResponse#isShardsAcknowledged()} to
73+
* determine if the requisite shard copies were all started before returning or timing out.
74+
*
75+
* @param waitForActiveShards number of active shard copies to wait on
76+
*/
77+
public OpenIndexRequestBuilder setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
78+
request.waitForActiveShards(waitForActiveShards);
79+
return this;
80+
}
81+
82+
/**
83+
* A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
84+
* shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
85+
* to get the ActiveShardCount.
86+
*/
87+
public OpenIndexRequestBuilder setWaitForActiveShards(final int waitForActiveShards) {
88+
return setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
89+
}
6190
}

core/src/main/java/org/elasticsearch/action/admin/indices/open/OpenIndexResponse.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.action.admin.indices.open;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.action.support.master.AcknowledgedResponse;
2324
import org.elasticsearch.common.io.stream.StreamInput;
2425
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -30,22 +31,41 @@
3031
*/
3132
public class OpenIndexResponse extends AcknowledgedResponse {
3233

34+
private boolean shardsAcknowledged;
35+
3336
OpenIndexResponse() {
3437
}
3538

36-
OpenIndexResponse(boolean acknowledged) {
39+
OpenIndexResponse(boolean acknowledged, boolean shardsAcknowledged) {
3740
super(acknowledged);
41+
assert acknowledged || shardsAcknowledged == false; // if its not acknowledged, then shards acked should be false too
42+
this.shardsAcknowledged = shardsAcknowledged;
43+
}
44+
45+
/**
46+
* Returns true if the requisite number of shards were started before
47+
* returning from the indices opening operation. If {@link #isAcknowledged()}
48+
* is false, then this also returns false.
49+
*/
50+
public boolean isShardsAcknowledged() {
51+
return shardsAcknowledged;
3852
}
3953

4054
@Override
4155
public void readFrom(StreamInput in) throws IOException {
4256
super.readFrom(in);
4357
readAcknowledged(in);
58+
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
59+
shardsAcknowledged = in.readBoolean();
60+
}
4461
}
4562

4663
@Override
4764
public void writeTo(StreamOutput out) throws IOException {
4865
super.writeTo(out);
4966
writeAcknowledged(out);
67+
if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
68+
out.writeBoolean(shardsAcknowledged);
69+
}
5070
}
5171
}

core/src/main/java/org/elasticsearch/action/admin/indices/open/TransportOpenIndexAction.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,11 @@
2222
import org.apache.logging.log4j.message.ParameterizedMessage;
2323
import org.apache.logging.log4j.util.Supplier;
2424
import org.elasticsearch.action.ActionListener;
25-
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
2625
import org.elasticsearch.action.support.ActionFilters;
2726
import org.elasticsearch.action.support.DestructiveOperations;
2827
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
2928
import org.elasticsearch.cluster.ClusterState;
30-
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
29+
import org.elasticsearch.cluster.ack.OpenIndexClusterStateUpdateResponse;
3130
import org.elasticsearch.cluster.block.ClusterBlockException;
3231
import org.elasticsearch.cluster.block.ClusterBlockLevel;
3332
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@@ -84,18 +83,18 @@ protected ClusterBlockException checkBlock(OpenIndexRequest request, ClusterStat
8483
protected void masterOperation(final OpenIndexRequest request, final ClusterState state, final ActionListener<OpenIndexResponse> listener) {
8584
final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request);
8685
if (concreteIndices == null || concreteIndices.length == 0) {
87-
listener.onResponse(new OpenIndexResponse(true));
86+
listener.onResponse(new OpenIndexResponse(true, true));
8887
return;
8988
}
9089
OpenIndexClusterStateUpdateRequest updateRequest = new OpenIndexClusterStateUpdateRequest()
9190
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
92-
.indices(concreteIndices);
91+
.indices(concreteIndices).waitForActiveShards(request.waitForActiveShards());
9392

94-
indexStateService.openIndex(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
93+
indexStateService.openIndex(updateRequest, new ActionListener<OpenIndexClusterStateUpdateResponse>() {
9594

9695
@Override
97-
public void onResponse(ClusterStateUpdateResponse response) {
98-
listener.onResponse(new OpenIndexResponse(response.isAcknowledged()));
96+
public void onResponse(OpenIndexClusterStateUpdateResponse response) {
97+
listener.onResponse(new OpenIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged()));
9998
}
10099

101100
@Override

core/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public void onResponse(IndicesStatsResponse statsResponse) {
136136
rolloverRequest),
137137
ActionListener.wrap(aliasClusterStateUpdateResponse -> {
138138
if (aliasClusterStateUpdateResponse.isAcknowledged()) {
139-
activeShardsObserver.waitForActiveShards(rolloverIndexName,
139+
activeShardsObserver.waitForActiveShards(new String[]{rolloverIndexName},
140140
rolloverRequest.getCreateIndexRequest().waitForActiveShards(),
141141
rolloverRequest.masterNodeTimeout(),
142142
isShardsAcked -> listener.onResponse(new RolloverResponse(sourceIndexName, rolloverIndexName,

core/src/main/java/org/elasticsearch/action/support/ActiveShardCount.java

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -138,36 +138,40 @@ public boolean enoughShardsActive(final int activeShardCount) {
138138

139139
/**
140140
* Returns true iff the given cluster state's routing table contains enough active
141-
* shards for the given index to meet the required shard count represented by this instance.
141+
* shards for the given indices to meet the required shard count represented by this instance.
142142
*/
143-
public boolean enoughShardsActive(final ClusterState clusterState, final String indexName) {
143+
public boolean enoughShardsActive(final ClusterState clusterState, final String... indices) {
144144
if (this == ActiveShardCount.NONE) {
145145
// not waiting for any active shards
146146
return true;
147147
}
148-
final IndexMetaData indexMetaData = clusterState.metaData().index(indexName);
149-
if (indexMetaData == null) {
150-
// its possible the index was deleted while waiting for active shard copies,
151-
// in this case, we'll just consider it that we have enough active shard copies
152-
// and we can stop waiting
153-
return true;
154-
}
155-
final IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(indexName);
156-
assert indexRoutingTable != null;
157-
if (indexRoutingTable.allPrimaryShardsActive() == false) {
158-
// all primary shards aren't active yet
159-
return false;
160-
}
161-
ActiveShardCount waitForActiveShards = this;
162-
if (waitForActiveShards == ActiveShardCount.DEFAULT) {
163-
waitForActiveShards = SETTING_WAIT_FOR_ACTIVE_SHARDS.get(indexMetaData.getSettings());
164-
}
165-
for (final IntObjectCursor<IndexShardRoutingTable> shardRouting : indexRoutingTable.getShards()) {
166-
if (waitForActiveShards.enoughShardsActive(shardRouting.value) == false) {
167-
// not enough active shard copies yet
148+
149+
for (final String indexName : indices) {
150+
final IndexMetaData indexMetaData = clusterState.metaData().index(indexName);
151+
if (indexMetaData == null) {
152+
// its possible the index was deleted while waiting for active shard copies,
153+
// in this case, we'll just consider it that we have enough active shard copies
154+
// and we can stop waiting
155+
continue;
156+
}
157+
final IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(indexName);
158+
assert indexRoutingTable != null;
159+
if (indexRoutingTable.allPrimaryShardsActive() == false) {
160+
// all primary shards aren't active yet
168161
return false;
169162
}
163+
ActiveShardCount waitForActiveShards = this;
164+
if (waitForActiveShards == ActiveShardCount.DEFAULT) {
165+
waitForActiveShards = SETTING_WAIT_FOR_ACTIVE_SHARDS.get(indexMetaData.getSettings());
166+
}
167+
for (final IntObjectCursor<IndexShardRoutingTable> shardRouting : indexRoutingTable.getShards()) {
168+
if (waitForActiveShards.enoughShardsActive(shardRouting.value) == false) {
169+
// not enough active shard copies yet
170+
return false;
171+
}
172+
}
170173
}
174+
171175
return true;
172176
}
173177

core/src/main/java/org/elasticsearch/action/support/ActiveShardsObserver.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.node.NodeClosedException;
3030
import org.elasticsearch.threadpool.ThreadPool;
3131

32+
import java.util.Arrays;
3233
import java.util.function.Consumer;
3334
import java.util.function.Predicate;
3435

@@ -50,13 +51,13 @@ public ActiveShardsObserver(final Settings settings, final ClusterService cluste
5051
/**
5152
* Waits on the specified number of active shards to be started before executing the
5253
*
53-
* @param indexName the index to wait for active shards on
54+
* @param indexNames the indices to wait for active shards on
5455
* @param activeShardCount the number of active shards to wait on before returning
5556
* @param timeout the timeout value
5657
* @param onResult a function that is executed in response to the requisite shards becoming active or a timeout (whichever comes first)
5758
* @param onFailure a function that is executed in response to an error occurring during waiting for the active shards
5859
*/
59-
public void waitForActiveShards(final String indexName,
60+
public void waitForActiveShards(final String[] indexNames,
6061
final ActiveShardCount activeShardCount,
6162
final TimeValue timeout,
6263
final Consumer<Boolean> onResult,
@@ -71,10 +72,10 @@ public void waitForActiveShards(final String indexName,
7172

7273
final ClusterState state = clusterService.state();
7374
final ClusterStateObserver observer = new ClusterStateObserver(state, clusterService, null, logger, threadPool.getThreadContext());
74-
if (activeShardCount.enoughShardsActive(state, indexName)) {
75+
if (activeShardCount.enoughShardsActive(state, indexNames)) {
7576
onResult.accept(true);
7677
} else {
77-
final Predicate<ClusterState> shardsAllocatedPredicate = newState -> activeShardCount.enoughShardsActive(newState, indexName);
78+
final Predicate<ClusterState> shardsAllocatedPredicate = newState -> activeShardCount.enoughShardsActive(newState, indexNames);
7879

7980
final ClusterStateObserver.Listener observerListener = new ClusterStateObserver.Listener() {
8081
@Override
@@ -84,7 +85,7 @@ public void onNewClusterState(ClusterState state) {
8485

8586
@Override
8687
public void onClusterServiceClose() {
87-
logger.debug("[{}] cluster service closed while waiting for enough shards to be started.", indexName);
88+
logger.debug("[{}] cluster service closed while waiting for enough shards to be started.", Arrays.toString(indexNames));
8889
onFailure.accept(new NodeClosedException(clusterService.localNode()));
8990
}
9091

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.cluster.ack;
20+
21+
/**
22+
* A cluster state update response with specific fields for index opening.
23+
*/
24+
public class OpenIndexClusterStateUpdateResponse extends ClusterStateUpdateResponse {
25+
26+
private final boolean shardsAcknowledged;
27+
28+
public OpenIndexClusterStateUpdateResponse(boolean acknowledged, boolean shardsAcknowledged) {
29+
super(acknowledged);
30+
this.shardsAcknowledged = shardsAcknowledged;
31+
}
32+
33+
/**
34+
* Returns whether the requisite number of shard copies started before the completion of the operation.
35+
*/
36+
public boolean isShardsAcknowledged() {
37+
return shardsAcknowledged;
38+
}
39+
}

core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ public void createIndex(final CreateIndexClusterStateUpdateRequest request,
209209
final ActionListener<CreateIndexClusterStateUpdateResponse> listener) {
210210
onlyCreateIndex(request, ActionListener.wrap(response -> {
211211
if (response.isAcknowledged()) {
212-
activeShardsObserver.waitForActiveShards(request.index(), request.waitForActiveShards(), request.ackTimeout(),
212+
activeShardsObserver.waitForActiveShards(new String[]{request.index()}, request.waitForActiveShards(), request.ackTimeout(),
213213
shardsAcked -> {
214214
if (shardsAcked == false) {
215215
logger.debug("[{}] index created, but the operation timed out while waiting for " +

0 commit comments

Comments
 (0)