From 083b3e00a8058419f3aed78fa965e9084f043996 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 28 Dec 2018 16:42:51 +0100 Subject: [PATCH 1/2] Force Refresh Listeners when Acquiring all Operation Permits (#36835) * Fixes the issue reproduced in the added tests: * When having open index requests on a shard that are waiting for a refresh, relocating that shard becomes blocked until that refresh happens (which could be never as in the test scenario). --- .../elasticsearch/index/shard/IndexShard.java | 31 +++++- .../shard/IndexShardOperationPermits.java | 2 +- .../index/shard/RefreshListeners.java | 37 +++++++- .../index/shard/RefreshListenersTests.java | 35 +++++++ .../elasticsearch/recovery/RelocationIT.java | 94 +++++++++++++++++++ 5 files changed, 193 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index bcf8ba7fa5046..53354f9a7c66a 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -635,8 +635,10 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta public void relocated(final Consumer consumer) throws IllegalIndexShardStateException, InterruptedException { assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; + final Releasable forceRefreshes = refreshListeners.forceRefreshes(); try { indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { + forceRefreshes.close(); // no shard operation permits are being held here, move state from started to relocated assert indexShardOperationPermits.getActiveOperationsCount() == 0 : "in-flight operations in progress while moving shard state to relocated"; @@ -667,6 +669,8 @@ public void relocated(final Consumer consumer // Fail primary relocation source and target shards. failShard("timed out waiting for relocation hand-off to complete", null); throw new IndexShardClosedException(shardId(), "timed out waiting for relocation hand-off to complete"); + } finally { + forceRefreshes.close(); } } @@ -2360,7 +2364,24 @@ public void acquireAllPrimaryOperationsPermits(final ActionListener verifyNotClosed(); assert shardRouting.primary() : "acquireAllPrimaryOperationsPermits should only be called on primary shard: " + shardRouting; - indexShardOperationPermits.asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit()); + asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit()); + } + + private void asyncBlockOperations(ActionListener onPermitAcquired, long timeout, TimeUnit timeUnit) { + final Releasable forceRefreshes = refreshListeners.forceRefreshes(); + final ActionListener wrappedListener = ActionListener.wrap(r -> { + forceRefreshes.close(); + onPermitAcquired.onResponse(r); + }, e -> { + forceRefreshes.close(); + onPermitAcquired.onFailure(e); + }); + try { + indexShardOperationPermits.asyncBlockOperations(wrappedListener, timeout, timeUnit); + } catch (Exception e) { + forceRefreshes.close(); + throw e; + } } private void bumpPrimaryTerm(final long newPrimaryTerm, @@ -2370,7 +2391,7 @@ private void bumpPrimaryTerm(final long newPrimaryTerm, assert newPrimaryTerm > pendingPrimaryTerm || (newPrimaryTerm >= pendingPrimaryTerm && combineWithAction != null); assert operationPrimaryTerm <= pendingPrimaryTerm; final CountDownLatch termUpdated = new CountDownLatch(1); - indexShardOperationPermits.asyncBlockOperations(new ActionListener() { + asyncBlockOperations(new ActionListener() { @Override public void onFailure(final Exception e) { try { @@ -2463,8 +2484,10 @@ public void acquireAllReplicaOperationsPermits(final long opPrimaryTerm, final long maxSeqNoOfUpdatesOrDeletes, final ActionListener onPermitAcquired, final TimeValue timeout) { - innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired, true, - (listener) -> indexShardOperationPermits.asyncBlockOperations(listener, timeout.duration(), timeout.timeUnit())); + innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, + onPermitAcquired, true, + listener -> asyncBlockOperations(listener, timeout.duration(), timeout.timeUnit()) + ); } private void innerAcquireReplicaOperationPermit(final long opPrimaryTerm, diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index 51427dcfa71bf..0a96550034787 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -299,7 +299,7 @@ private Releasable acquire(Object debugInfo, StackTraceElement[] stackTrace) thr /** * Obtain the active operation count, or zero if all permits are held (even if there are outstanding operations in flight). * - * @return the active operation count, or zero when all permits ar eheld + * @return the active operation count, or zero when all permits are held */ int getActiveOperationsCount() { int availablePermits = semaphore.availablePermits(); diff --git a/server/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java b/server/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java index d8a51d58ad956..b4b9e13f7e063 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java +++ b/server/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java @@ -22,6 +22,8 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.search.ReferenceManager; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.translog.Translog; @@ -53,6 +55,13 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener, * Is this closed? If true then we won't add more listeners and have flushed all pending listeners. */ private volatile boolean closed = false; + + /** + * Force-refreshes new refresh listeners that are added while {@code >= 0}. Used to prevent becoming blocked on operations waiting for + * refresh during relocation. + */ + private int refreshForcers; + /** * List of refresh listeners. Defaults to null and built on demand because most refresh cycles won't need it. Entries are never removed * from it, rather, it is nulled and rebuilt when needed again. The (hopefully) rare entries that didn't make the current refresh cycle @@ -75,6 +84,32 @@ public RefreshListeners(IntSupplier getMaxRefreshListeners, Runnable forceRefres this.threadContext = threadContext; } + /** + * Force-refreshes newly added listeners and forces a refresh if there are currently listeners registered. See {@link #refreshForcers}. + */ + public Releasable forceRefreshes() { + synchronized (this) { + assert refreshForcers >= 0; + refreshForcers += 1; + } + final RunOnce runOnce = new RunOnce(() -> { + synchronized (RefreshListeners.this) { + assert refreshForcers > 0; + refreshForcers -= 1; + } + }); + if (refreshNeeded()) { + try { + forceRefresh.run(); + } catch (Exception e) { + runOnce.run(); + throw e; + } + } + assert refreshListeners == null; + return () -> runOnce.run(); + } + /** * Add a listener for refreshes, calling it immediately if the location is already visible. If this runs out of listener slots then it * forces a refresh and calls the listener immediately as well. @@ -102,7 +137,7 @@ public boolean addOrNotify(Translog.Location location, Consumer listene listeners = new ArrayList<>(); refreshListeners = listeners; } - if (listeners.size() < getMaxRefreshListeners.getAsInt()) { + if (refreshForcers == 0 && listeners.size() < getMaxRefreshListeners.getAsInt()) { ThreadContext.StoredContext storedContext = threadContext.newStoredContext(true); Consumer contextPreservingListener = forced -> { try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index cbc08b19e8a07..d18a8ea5f2cad 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -342,6 +343,40 @@ public void testLotsOfThreads() throws Exception { refresher.cancel(); } + public void testDisallowAddListeners() throws Exception { + assertEquals(0, listeners.pendingCount()); + DummyRefreshListener listener = new DummyRefreshListener(); + assertFalse(listeners.addOrNotify(index("1").getTranslogLocation(), listener)); + engine.refresh("I said so"); + assertFalse(listener.forcedRefresh.get()); + listener.assertNoError(); + + try (Releasable releaseable1 = listeners.forceRefreshes()) { + listener = new DummyRefreshListener(); + assertTrue(listeners.addOrNotify(index("1").getTranslogLocation(), listener)); + assertTrue(listener.forcedRefresh.get()); + listener.assertNoError(); + assertEquals(0, listeners.pendingCount()); + + try (Releasable releaseable2 = listeners.forceRefreshes()) { + listener = new DummyRefreshListener(); + assertTrue(listeners.addOrNotify(index("1").getTranslogLocation(), listener)); + assertTrue(listener.forcedRefresh.get()); + listener.assertNoError(); + assertEquals(0, listeners.pendingCount()); + } + + listener = new DummyRefreshListener(); + assertTrue(listeners.addOrNotify(index("1").getTranslogLocation(), listener)); + assertTrue(listener.forcedRefresh.get()); + listener.assertNoError(); + assertEquals(0, listeners.pendingCount()); + } + + assertFalse(listeners.addOrNotify(index("1").getTranslogLocation(), new DummyRefreshListener())); + assertEquals(1, listeners.pendingCount()); + } + private Engine.IndexResult index(String id) throws IOException { return index(id, "test"); } diff --git a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java index df097b38a3d30..fc1c71d150d49 100644 --- a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -23,9 +23,12 @@ import com.carrotsearch.hppc.procedures.IntProcedure; import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.util.English; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -487,6 +490,97 @@ public void testIndexAndRelocateConcurrently() throws ExecutionException, Interr } + public void testRelocateWhileWaitingForRefresh() { + logger.info("--> starting [node1] ..."); + final String node1 = internalCluster().startNode(); + + logger.info("--> creating test index ..."); + prepareCreate("test", Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.refresh_interval", -1) // we want to control refreshes + ).get(); + + logger.info("--> index 10 docs"); + for (int i = 0; i < 10; i++) { + client().prepareIndex("test", "type", Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + logger.info("--> flush so we have an actual index"); + client().admin().indices().prepareFlush().execute().actionGet(); + logger.info("--> index more docs so we have something in the translog"); + for (int i = 10; i < 20; i++) { + client().prepareIndex("test", "type", Integer.toString(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i).execute(); + } + + logger.info("--> start another node"); + final String node2 = internalCluster().startNode(); + ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID) + .setWaitForNodes("2").execute().actionGet(); + assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + + logger.info("--> relocate the shard from node1 to node2"); + client().admin().cluster().prepareReroute() + .add(new MoveAllocationCommand("test", 0, node1, node2)) + .execute().actionGet(); + + clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet(); + assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + + logger.info("--> verifying count"); + client().admin().indices().prepareRefresh().execute().actionGet(); + assertThat(client().prepareSearch("test").setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L)); + } + + public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() { + logger.info("--> starting [node1] ..."); + final String node1 = internalCluster().startNode(); + + logger.info("--> creating test index ..."); + prepareCreate("test", Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.refresh_interval", -1) // we want to control refreshes + ).get(); + + logger.info("--> index 10 docs"); + for (int i = 0; i < 10; i++) { + client().prepareIndex("test", "type", Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + logger.info("--> flush so we have an actual index"); + client().admin().indices().prepareFlush().execute().actionGet(); + logger.info("--> index more docs so we have something in the translog"); + for (int i = 10; i < 20; i++) { + client().prepareIndex("test", "type", Integer.toString(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i).execute(); + } + + logger.info("--> start another node"); + final String node2 = internalCluster().startNode(); + ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID) + .setWaitForNodes("2").execute().actionGet(); + assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + + logger.info("--> relocate the shard from node1 to node2"); + ActionFuture relocationListener = client().admin().cluster().prepareReroute() + .add(new MoveAllocationCommand("test", 0, node1, node2)) + .execute(); + logger.info("--> index 100 docs while relocating"); + for (int i = 20; i < 120; i++) { + client().prepareIndex("test", "type", Integer.toString(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i).execute(); + } + relocationListener.actionGet(); + clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet(); + assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + + logger.info("--> verifying count"); + client().admin().indices().prepareRefresh().execute().actionGet(); + assertThat(client().prepareSearch("test").setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(120L)); + } + class RecoveryCorruption implements StubbableTransport.SendRequestBehavior { private final CountDownLatch corruptionCount; From dcfb2b40d063ff5d81f9a78da94d8d9dec1d878d Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 28 Dec 2018 18:48:59 +0100 Subject: [PATCH 2/2] fix compilation --- .../test/java/org/elasticsearch/recovery/RelocationIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java index fc1c71d150d49..580e7e58d1322 100644 --- a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -530,7 +530,7 @@ public void testRelocateWhileWaitingForRefresh() { logger.info("--> verifying count"); client().admin().indices().prepareRefresh().execute().actionGet(); - assertThat(client().prepareSearch("test").setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L)); + assertThat(client().prepareSearch("test").setSize(0).execute().actionGet().getHits().getTotalHits(), equalTo(20L)); } public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() { @@ -578,7 +578,7 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() { logger.info("--> verifying count"); client().admin().indices().prepareRefresh().execute().actionGet(); - assertThat(client().prepareSearch("test").setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(120L)); + assertThat(client().prepareSearch("test").setSize(0).execute().actionGet().getHits().getTotalHits(), equalTo(120L)); } class RecoveryCorruption implements StubbableTransport.SendRequestBehavior {