From 1bf1cf780298114111d65c226624a30f3340218c Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 24 Jul 2017 14:36:53 +0200 Subject: [PATCH 1/3] Close translog view after primary-replica resync The translog view was being closed too early, possibly causing a failed resync --- .../elasticsearch/index/shard/IndexShard.java | 3 +- .../index/shard/PrimaryReplicaSyncer.java | 30 +++++++++++++++++-- .../cluster/IndicesClusterStateService.java | 4 +-- ...actIndicesClusterStateServiceTestCase.java | 5 ++-- 4 files changed, 32 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index d7350b1cc935f..24c735499737b 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -49,7 +49,6 @@ import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Booleans; -import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -367,7 +366,7 @@ public QueryCachingPolicy getQueryCachingPolicy() { @Override public void updateShardState(final ShardRouting newRouting, final long newPrimaryTerm, - final CheckedBiConsumer, IOException> primaryReplicaSyncer, + final BiConsumer> primaryReplicaSyncer, final long applyingClusterStateVersion, final Set inSyncAllocationIds, final IndexShardRoutingTable routingTable, diff --git a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index bec263f54702f..d6f25e04fc6ea 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -78,8 +78,30 @@ void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests this.chunkSize = chunkSize; } - public void resync(IndexShard indexShard, ActionListener listener) throws IOException { - try (Translog.View view = indexShard.acquireTranslogView()) { + public void resync(IndexShard indexShard, ActionListener listener) { + final Translog.View view = indexShard.acquireTranslogView(); + ActionListener wrappedListener = new ActionListener() { + @Override + public void onResponse(ResyncTask resyncTask) { + try { + view.close(); + } catch (IOException e) { + onFailure(e); + } + listener.onResponse(resyncTask); + } + + @Override + public void onFailure(Exception e) { + try { + view.close(); + } catch (IOException inner) { + e.addSuppressed(inner); + } + listener.onFailure(e); + } + }; + try { final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1; Translog.Snapshot snapshot = view.snapshot(startingSeqNo); ShardId shardId = indexShard.shardId(); @@ -105,7 +127,9 @@ public synchronized Translog.Operation next() throws IOException { }; resync(shardId, indexShard.routingEntry().allocationId().getId(), indexShard.getPrimaryTerm(), wrappedSnapshot, - startingSeqNo, listener); + startingSeqNo, wrappedListener); + } catch (Exception e) { + wrappedListener.onFailure(e); } } diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 84fa54e4613ed..3c1ee5b841293 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -41,7 +41,6 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; @@ -87,6 +86,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -746,7 +746,7 @@ public interface Shard { */ void updateShardState(ShardRouting shardRouting, long primaryTerm, - CheckedBiConsumer, IOException> primaryReplicaSyncer, + BiConsumer> primaryReplicaSyncer, long applyingClusterStateVersion, Set inSyncAllocationIds, IndexShardRoutingTable routingTable, diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 3c97f26950e1e..208e7443c7daf 100644 --- a/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -25,7 +25,6 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -54,8 +53,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.function.BiConsumer; import java.util.function.Consumer; -import java.util.stream.Collectors; import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; @@ -345,7 +344,7 @@ public RecoveryState recoveryState() { @Override public void updateShardState(ShardRouting shardRouting, long newPrimaryTerm, - CheckedBiConsumer, IOException> primaryReplicaSyncer, + BiConsumer> primaryReplicaSyncer, long applyingClusterStateVersion, Set inSyncAllocationIds, IndexShardRoutingTable routingTable, From 5461a0b0c8c907d8a0199fd81f54acd82f6911f5 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 27 Jul 2017 14:00:11 +0200 Subject: [PATCH 2/3] test and exception handling --- .../index/shard/PrimaryReplicaSyncer.java | 8 ++-- .../shard/PrimaryReplicaSyncerTests.java | 42 +++++++++++++++++++ 2 files changed, 47 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index d6f25e04fc6ea..f688844cbdfbc 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -118,9 +118,11 @@ public synchronized int totalOperations() { @Override public synchronized Translog.Operation next() throws IOException { - if (indexShard.state() != IndexShardState.STARTED) { - assert indexShard.state() != IndexShardState.RELOCATED : "resync should never happen on a relocated shard"; - throw new IndexShardNotStartedException(shardId, indexShard.state()); + IndexShardState state = indexShard.state(); + if (state == IndexShardState.CLOSED) { + throw new IndexShardClosedException(shardId); + } else { + assert state == IndexShardState.STARTED : "resync should only happen on a started shard"; } return snapshot.next(); } diff --git a/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index d7652d1098128..406457f070455 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.index.shard; +import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.resync.ResyncReplicationResponse; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; @@ -90,6 +91,47 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception { closeShards(shard); } + public void testSyncerOnClosingShard() throws Exception { + IndexShard shard = newStartedShard(true); + AtomicBoolean syncActionCalled = new AtomicBoolean(); + PrimaryReplicaSyncer.SyncAction syncAction = + (request, parentTask, allocationId, primaryTerm, listener) -> { + logger.info("Sending off {} operations", request.getOperations().size()); + syncActionCalled.set(true); + threadPool.generic().execute(() -> listener.onResponse(new ResyncReplicationResponse())); + }; + PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(Settings.EMPTY, new TaskManager(Settings.EMPTY), syncAction); + syncer.setChunkSize(new ByteSizeValue(1)); // every document is sent off separately + + int numDocs = 10; + for (int i = 0; i < numDocs; i++) { + indexDoc(shard, "test", Integer.toString(i)); + } + + String allocationId = shard.routingEntry().allocationId().getId(); + shard.updateShardState(shard.routingEntry(), shard.getPrimaryTerm(), null, 1000L, Collections.singleton(allocationId), + new IndexShardRoutingTable.Builder(shard.shardId()).addShard(shard.routingEntry()).build(), Collections.emptySet()); + + PlainActionFuture fut = new PlainActionFuture<>(); + threadPool.generic().execute(() -> { + try { + syncer.resync(shard, fut); + } catch (AlreadyClosedException ace) { + fut.onFailure(ace); + } + }); + if (randomBoolean()) { + assertBusy(() -> assertTrue("Sync action was not called", syncActionCalled.get())); + } + closeShards(shard); + try { + fut.actionGet(); + assertTrue("Sync action was not called", syncActionCalled.get()); + } catch (AlreadyClosedException | IndexShardClosedException ignored) { + // ignore + } + } + public void testStatusSerialization() throws IOException { PrimaryReplicaSyncer.ResyncTask.Status status = new PrimaryReplicaSyncer.ResyncTask.Status(randomAlphaOfLength(10), randomIntBetween(0, 1000), randomIntBetween(0, 1000), randomIntBetween(0, 1000)); From 0585c56ca047df22ba364ff5260327f5a10ca6c3 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 27 Jul 2017 14:33:53 +0200 Subject: [PATCH 3/3] assertBusy -> latch --- .../org/elasticsearch/index/shard/PrimaryReplicaSyncer.java | 2 +- .../elasticsearch/index/shard/PrimaryReplicaSyncerTests.java | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index f688844cbdfbc..b340dbe82cef1 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -122,7 +122,7 @@ public synchronized Translog.Operation next() throws IOException { if (state == IndexShardState.CLOSED) { throw new IndexShardClosedException(shardId); } else { - assert state == IndexShardState.STARTED : "resync should only happen on a started shard"; + assert state == IndexShardState.STARTED : "resync should only happen on a started shard, but state was: " + state; } return snapshot.next(); } diff --git a/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index 406457f070455..725d39279d27c 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -34,6 +34,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import static org.hamcrest.Matchers.containsString; @@ -94,10 +95,12 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception { public void testSyncerOnClosingShard() throws Exception { IndexShard shard = newStartedShard(true); AtomicBoolean syncActionCalled = new AtomicBoolean(); + CountDownLatch syncCalledLatch = new CountDownLatch(1); PrimaryReplicaSyncer.SyncAction syncAction = (request, parentTask, allocationId, primaryTerm, listener) -> { logger.info("Sending off {} operations", request.getOperations().size()); syncActionCalled.set(true); + syncCalledLatch.countDown(); threadPool.generic().execute(() -> listener.onResponse(new ResyncReplicationResponse())); }; PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(Settings.EMPTY, new TaskManager(Settings.EMPTY), syncAction); @@ -121,7 +124,7 @@ public void testSyncerOnClosingShard() throws Exception { } }); if (randomBoolean()) { - assertBusy(() -> assertTrue("Sync action was not called", syncActionCalled.get())); + syncCalledLatch.await(); } closeShards(shard); try {