Skip to content

Commit 8bc95dd

Browse files
committed
Apply feedback
1 parent a8a0b66 commit 8bc95dd

File tree

5 files changed

+116
-208
lines changed

5 files changed

+116
-208
lines changed

server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java

Lines changed: 22 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.ElasticsearchException;
2727
import org.elasticsearch.Version;
2828
import org.elasticsearch.action.ActionListener;
29+
import org.elasticsearch.action.NotifyOnceListener;
2930
import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
3031
import org.elasticsearch.action.admin.indices.close.TransportVerifyShardBeforeCloseAction;
3132
import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest;
@@ -71,6 +72,7 @@
7172
import java.util.Map;
7273
import java.util.Optional;
7374
import java.util.Set;
75+
import java.util.function.Consumer;
7476
import java.util.stream.Collectors;
7577

7678
import static java.util.Collections.unmodifiableMap;
@@ -137,6 +139,7 @@ public void clusterStateProcessed(final String source, final ClusterState oldSta
137139
assert blockedIndices.isEmpty() : "List of blocked indices is not empty but cluster state wasn't changed";
138140
listener.onResponse(new AcknowledgedResponse(true));
139141
} else {
142+
assert blockedIndices.isEmpty() == false : "List of blocked indices is empty but cluster state was changed";
140143
threadPool.executor(ThreadPool.Names.MANAGEMENT)
141144
.execute(new WaitForClosedBlocksApplied(blockedIndices, timeout,
142145
ActionListener.wrap(closedBlocksResults ->
@@ -228,7 +231,7 @@ static ClusterState addIndexClosedBlocks(final Index[] indices, final ClusterSta
228231
blockedIndices.add(index);
229232
}
230233

231-
logger.debug(() -> new ParameterizedMessage("adding block to indices {}",
234+
logger.info(() -> new ParameterizedMessage("closing indices {}",
232235
blockedIndices.stream().map(Object::toString).collect(Collectors.joining(","))));
233236
return ClusterState.builder(currentState).blocks(blocks).metaData(metadata).routingTable(routingTable.build()).build();
234237
}
@@ -268,44 +271,27 @@ protected void doRun() throws Exception {
268271
final CountDown countDown = new CountDown(blockedIndices.size());
269272
final ClusterState state = clusterService.state();
270273
for (Index blockedIndex : blockedIndices) {
271-
waitForShardsReadyForClosing(blockedIndex, state, timeout, new ActionListener<AcknowledgedResponse>() {
272-
@Override
273-
public void onResponse(final AcknowledgedResponse result) {
274-
results.put(blockedIndex, result);
275-
processIfFinished();
276-
}
277-
278-
@Override
279-
public void onFailure(final Exception e) {
280-
results.put(blockedIndex, new AcknowledgedResponse(false));
281-
processIfFinished();
282-
}
283-
284-
private void processIfFinished() {
285-
if (countDown.countDown()) {
286-
listener.onResponse(unmodifiableMap(results));
287-
}
274+
waitForShardsReadyForClosing(blockedIndex, state, timeout, response -> {
275+
results.put(blockedIndex, response);
276+
if (countDown.countDown()) {
277+
listener.onResponse(unmodifiableMap(results));
288278
}
289279
});
290280
}
291281
}
292282

293283
private void waitForShardsReadyForClosing(final Index index, final ClusterState state, @Nullable final TimeValue timeout,
294-
final ActionListener<AcknowledgedResponse> listener) {
284+
final Consumer<AcknowledgedResponse> onResponse) {
295285
final IndexMetaData indexMetaData = state.metaData().index(index);
296-
if (indexMetaData == null || indexMetaData.getState() == IndexMetaData.State.CLOSE) {
297-
logger.debug("index {} has been blocked before closing and is already closed, ignoring", index);
298-
listener.onResponse(new AcknowledgedResponse(true));
286+
if (indexMetaData == null) {
287+
logger.debug("index {} has been blocked before closing and is now deleted, ignoring", index);
288+
onResponse.accept(new AcknowledgedResponse(true));
299289
return;
300290
}
301291
final IndexRoutingTable indexRoutingTable = state.routingTable().index(index);
302-
if (indexRoutingTable == null) {
303-
logger.debug("index {} has been blocked before closing but is now deleted, ignoring", index);
304-
listener.onResponse(new AcknowledgedResponse(true));
305-
return;
306-
} else if (indexRoutingTable.allPrimaryShardsUnassigned()) {
307-
logger.debug("index {} has been blocked before closing but is now unassigned, ignoring", index);
308-
listener.onResponse(new AcknowledgedResponse(true));
292+
if (indexRoutingTable == null || indexMetaData.getState() == IndexMetaData.State.CLOSE) {
293+
logger.debug("index {} has been blocked before closing and is already closed, ignoring", index);
294+
onResponse.accept(new AcknowledgedResponse(true));
309295
return;
310296
}
311297

@@ -316,24 +302,24 @@ private void waitForShardsReadyForClosing(final Index index, final ClusterState
316302
for (IntObjectCursor<IndexShardRoutingTable> shard : shards) {
317303
final IndexShardRoutingTable shardRoutingTable = shard.value;
318304
final ShardId shardId = shardRoutingTable.shardId();
319-
sendVerifyShardBeforeCloseRequest(shardRoutingTable, timeout, new ActionListener<ReplicationResponse>() {
305+
sendVerifyShardBeforeCloseRequest(shardRoutingTable, timeout, new NotifyOnceListener<ReplicationResponse>() {
320306
@Override
321-
public void onResponse(final ReplicationResponse replicationResponse) {
307+
public void innerOnResponse(final ReplicationResponse replicationResponse) {
322308
ReplicationResponse.ShardInfo shardInfo = replicationResponse.getShardInfo();
323309
results.setOnce(shardId.id(), new AcknowledgedResponse(shardInfo.getFailed() == 0));
324310
processIfFinished();
325311
}
326312

327313
@Override
328-
public void onFailure(final Exception e) {
314+
public void innerOnFailure(final Exception e) {
329315
results.setOnce(shardId.id(), new AcknowledgedResponse(false));
330316
processIfFinished();
331317
}
332318

333319
private void processIfFinished() {
334320
if (countDown.countDown()) {
335321
final boolean acknowledged = results.asList().stream().allMatch(AcknowledgedResponse::isAcknowledged);
336-
listener.onResponse(new AcknowledgedResponse(acknowledged));
322+
onResponse.accept(new AcknowledgedResponse(acknowledged));
337323
}
338324
}
339325
});
@@ -383,12 +369,14 @@ static ClusterState closeRoutingTable(final ClusterState currentState, final Map
383369
logger.debug("closing index {} failed, removing index block because: {}", index, result.getValue());
384370
blocks.removeIndexBlock(index.getName(), INDEX_CLOSED_BLOCK);
385371
}
372+
} else {
373+
logger.debug("index {} has been closed since it was blocked before closing, ignoring", index);
386374
}
387375
} catch (final IndexNotFoundException e) {
388376
logger.debug("index {} has been deleted since it was blocked before closing, ignoring", index);
389377
}
390378
}
391-
logger.debug("closing indices {}", closedIndices);
379+
logger.info("completed closing of indices {}", closedIndices);
392380
return ClusterState.builder(currentState).blocks(blocks).metaData(metadata).routingTable(routingTable.build()).build();
393381
}
394382

server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ public ClusterState closeIndices(ClusterState state, CloseIndexRequest request)
227227

228228
newState = MetaDataIndexStateServiceUtils.closeRoutingTable(newState, blockedIndices.stream()
229229
.collect(Collectors.toMap(Function.identity(), index -> new AcknowledgedResponse(true))));
230-
return newState;
230+
return allocationService.reroute(newState, "indices closed");
231231
}
232232

233233
public ClusterState openIndices(ClusterState state, OpenIndexRequest request) {

server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java

Lines changed: 32 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,8 @@
1818
*/
1919
package org.elasticsearch.indices.state;
2020

21-
import org.elasticsearch.ElasticsearchWrapperException;
21+
import org.elasticsearch.ExceptionsHelper;
2222
import org.elasticsearch.action.ActionRequestValidationException;
23-
import org.elasticsearch.action.bulk.BulkItemResponse;
24-
import org.elasticsearch.action.bulk.BulkRequestBuilder;
25-
import org.elasticsearch.action.bulk.BulkResponse;
26-
import org.elasticsearch.action.index.IndexRequest;
27-
import org.elasticsearch.action.index.IndexResponse;
2823
import org.elasticsearch.action.support.ActiveShardCount;
2924
import org.elasticsearch.action.support.IndicesOptions;
3025
import org.elasticsearch.cluster.ClusterState;
@@ -34,17 +29,14 @@
3429
import org.elasticsearch.cluster.routing.ShardRouting;
3530
import org.elasticsearch.common.settings.Settings;
3631
import org.elasticsearch.index.IndexNotFoundException;
37-
import org.elasticsearch.index.engine.VersionConflictEngineException;
3832
import org.elasticsearch.indices.IndexClosedException;
39-
import org.elasticsearch.rest.RestStatus;
33+
import org.elasticsearch.test.BackgroundIndexer;
4034
import org.elasticsearch.test.ESIntegTestCase;
4135

4236
import java.util.ArrayList;
4337
import java.util.List;
4438
import java.util.Locale;
4539
import java.util.concurrent.CountDownLatch;
46-
import java.util.concurrent.atomic.AtomicBoolean;
47-
import java.util.concurrent.atomic.AtomicInteger;
4840
import java.util.stream.IntStream;
4941

5042
import static java.util.stream.Collectors.toList;
@@ -102,7 +94,7 @@ public void testCloseIndex() throws Exception {
10294
assertIndexIsClosed(indexName);
10395

10496
assertAcked(client().admin().indices().prepareOpen(indexName));
105-
assertHitCount(client().prepareSearch(indexName).setSize(0).setFetchSource(false).get(), nbDocs);
97+
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), nbDocs);
10698
}
10799

108100
public void testCloseAlreadyClosedIndex() throws Exception {
@@ -170,90 +162,26 @@ public void testCloseWhileIndexingDocuments() throws Exception {
170162
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
171163
createIndex(indexName);
172164

173-
final int nbDocs = randomIntBetween(10, 50);
174-
final AtomicInteger acknowledgedDocs = new AtomicInteger(nbDocs);
175-
indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, nbDocs)
176-
.mapToObj(i -> client().prepareIndex(indexName, "_doc", String.valueOf(i)).setSource("num", i)).collect(toList()));
177-
178-
final CountDownLatch startIndexing = new CountDownLatch(1);
179-
final CountDownLatch startClosing = new CountDownLatch(1);
180-
final AtomicBoolean runIndexing = new AtomicBoolean(true);
181-
182-
final Thread[] threads = new Thread[randomIntBetween(2, 5)];
183-
for (int i = 0; i < threads.length; i++) {
184-
threads[i] = new Thread(() -> {
185-
try {
186-
startIndexing.await();
187-
} catch (InterruptedException e) {
188-
throw new AssertionError(e);
189-
}
190-
while (runIndexing.get()) {
191-
switch (randomIntBetween(0, 2)) {
192-
case 0: // Single doc indexing
193-
{
194-
try {
195-
IndexResponse response = client().prepareIndex(indexName, "_doc").setSource("num", randomInt()).get();
196-
if (response.status() == RestStatus.CREATED) {
197-
acknowledgedDocs.incrementAndGet();
198-
}
199-
} catch (Exception e) {
200-
assertException(e, indexName);
201-
}
202-
break;
203-
}
204-
case 1: // Bulk docs indexing
205-
{
206-
BulkRequestBuilder request = client().prepareBulk(indexName, "_doc");
207-
for (int j = 0; j < randomIntBetween(1, 10); j++) {
208-
request.add(new IndexRequest().source("num", randomInt()));
209-
}
210-
211-
BulkResponse response = request.get();
212-
startClosing.countDown();
213-
for (BulkItemResponse itemResponse : response) {
214-
if (itemResponse.isFailed() == false) {
215-
acknowledgedDocs.incrementAndGet();
216-
} else {
217-
assertException(itemResponse.getFailure().getCause(), indexName);
218-
}
219-
}
220-
break;
221-
}
222-
case 2: // Single doc update
223-
{
224-
try {
225-
int docId = randomIntBetween(0, nbDocs - 1);
226-
client().prepareUpdate(indexName, "_doc", String.valueOf(docId)).setDoc("num", randomInt()).get();
227-
} catch (VersionConflictEngineException e) {
228-
// it's ok to ignore
229-
} catch (Exception e) {
230-
assertException(e, indexName);
231-
}
232-
break;
233-
}
234-
default:
235-
throw new AssertionError("Illegal randomisation branch");
236-
}
237-
}
238-
});
239-
threads[i].start();
240-
}
165+
int nbDocs = 0;
166+
try (BackgroundIndexer indexer = new BackgroundIndexer(indexName, "_doc", client())) {
167+
indexer.setAssertNoFailuresOnStop(false);
241168

242-
try {
243-
startIndexing.countDown();
244-
startClosing.await();
169+
waitForDocs(randomIntBetween(10, 50), indexer);
245170
assertAcked(client().admin().indices().prepareClose(indexName));
246-
} finally {
247-
runIndexing.set(false);
248-
}
171+
indexer.stop();
172+
nbDocs += indexer.totalIndexedDocs();
249173

250-
for (Thread thread : threads) {
251-
thread.join();
174+
final Throwable[] failures = indexer.getFailures();
175+
if (failures != null) {
176+
for (Throwable failure : failures) {
177+
assertException(failure, indexName);
178+
}
179+
}
252180
}
253181

254182
assertIndexIsClosed(indexName);
255183
assertAcked(client().admin().indices().prepareOpen(indexName));
256-
assertHitCount(client().prepareSearch(indexName).setSize(0).setFetchSource(false).get(), acknowledgedDocs.get());
184+
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), nbDocs);
257185
}
258186

259187
public void testCloseWhileDeletingIndices() throws Exception {
@@ -317,27 +245,29 @@ static void assertIndexIsClosed(final String indexName) {
317245
assertThat(clusterState.blocks().hasIndexBlock(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK), is(true));
318246
}
319247

320-
static void assertException(final Exception exception, final String indexName) {
321-
if (exception instanceof ElasticsearchWrapperException) {
322-
if (exception.getCause() != null && exception.getCause() instanceof Exception) {
323-
assertException((Exception) exception.getCause(), indexName);
324-
return;
325-
}
326-
}
327-
if (exception instanceof ClusterBlockException) {
328-
ClusterBlockException clusterBlockException = (ClusterBlockException) exception;
248+
static void assertIndexIsOpened(final String indexName) {
249+
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
250+
assertThat(clusterState.metaData().indices().get(indexName).getState(), is(IndexMetaData.State.OPEN));
251+
assertThat(clusterState.routingTable().index(indexName), notNullValue());
252+
assertThat(clusterState.blocks().hasIndexBlock(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK), is(false));
253+
}
254+
255+
static void assertException(final Throwable throwable, final String indexName) {
256+
final Throwable t = ExceptionsHelper.unwrapCause(throwable);
257+
if (t instanceof ClusterBlockException) {
258+
ClusterBlockException clusterBlockException = (ClusterBlockException) t;
329259
assertThat(clusterBlockException.blocks(), hasSize(1));
330260
assertThat(clusterBlockException.blocks(), hasItem(MetaDataIndexStateService.INDEX_CLOSED_BLOCK));
331-
} else if (exception instanceof IndexClosedException) {
332-
IndexClosedException indexClosedException = (IndexClosedException) exception;
261+
} else if (t instanceof IndexClosedException) {
262+
IndexClosedException indexClosedException = (IndexClosedException) t;
333263
assertThat(indexClosedException.getIndex(), notNullValue());
334264
assertThat(indexClosedException.getIndex().getName(), equalTo(indexName));
335-
} else if (exception instanceof IndexNotFoundException) {
336-
IndexNotFoundException indexNotFoundException = (IndexNotFoundException) exception;
265+
} else if (t instanceof IndexNotFoundException) {
266+
IndexNotFoundException indexNotFoundException = (IndexNotFoundException) t;
337267
assertThat(indexNotFoundException.getIndex(), notNullValue());
338268
assertThat(indexNotFoundException.getIndex().getName(), equalTo(indexName));
339269
} else {
340-
fail("Unexpected exception: " + exception);
270+
fail("Unexpected exception: " + t);
341271
}
342272
}
343273
}

0 commit comments

Comments
 (0)