Skip to content

Commit d6f5337

Browse files
committed
Close translog view after primary-replica resync (#25862)
The translog view was being closed too early, possibly causing a failed resync. Note: The bug only affects unreleased code. Relates to #24841
1 parent b4f3669 commit d6f5337

File tree

5 files changed

+82
-13
lines changed

5 files changed

+82
-13
lines changed

core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
5050
import org.elasticsearch.cluster.routing.ShardRouting;
5151
import org.elasticsearch.common.Booleans;
52-
import org.elasticsearch.common.CheckedBiConsumer;
5352
import org.elasticsearch.common.Nullable;
5453
import org.elasticsearch.common.collect.Tuple;
5554
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@@ -360,7 +359,7 @@ public QueryCachingPolicy getQueryCachingPolicy() {
360359
@Override
361360
public void updateShardState(final ShardRouting newRouting,
362361
final long newPrimaryTerm,
363-
final CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer,
362+
final BiConsumer<IndexShard, ActionListener<ResyncTask>> primaryReplicaSyncer,
364363
final long applyingClusterStateVersion,
365364
final Set<String> inSyncAllocationIds,
366365
final IndexShardRoutingTable routingTable,

core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,30 @@ void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests
7878
this.chunkSize = chunkSize;
7979
}
8080

81-
public void resync(IndexShard indexShard, ActionListener<ResyncTask> listener) throws IOException {
82-
try (Translog.View view = indexShard.acquireTranslogView()) {
81+
public void resync(IndexShard indexShard, ActionListener<ResyncTask> listener) {
82+
final Translog.View view = indexShard.acquireTranslogView();
83+
ActionListener<ResyncTask> wrappedListener = new ActionListener<ResyncTask>() {
84+
@Override
85+
public void onResponse(ResyncTask resyncTask) {
86+
try {
87+
view.close();
88+
} catch (IOException e) {
89+
onFailure(e);
90+
}
91+
listener.onResponse(resyncTask);
92+
}
93+
94+
@Override
95+
public void onFailure(Exception e) {
96+
try {
97+
view.close();
98+
} catch (IOException inner) {
99+
e.addSuppressed(inner);
100+
}
101+
listener.onFailure(e);
102+
}
103+
};
104+
try {
83105
final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1;
84106
Translog.Snapshot snapshot = view.snapshot(startingSeqNo);
85107
ShardId shardId = indexShard.shardId();
@@ -96,16 +118,20 @@ public synchronized int totalOperations() {
96118

97119
@Override
98120
public synchronized Translog.Operation next() throws IOException {
99-
if (indexShard.state() != IndexShardState.STARTED) {
100-
assert indexShard.state() != IndexShardState.RELOCATED : "resync should never happen on a relocated shard";
101-
throw new IndexShardNotStartedException(shardId, indexShard.state());
121+
IndexShardState state = indexShard.state();
122+
if (state == IndexShardState.CLOSED) {
123+
throw new IndexShardClosedException(shardId);
124+
} else {
125+
assert state == IndexShardState.STARTED : "resync should only happen on a started shard, but state was: " + state;
102126
}
103127
return snapshot.next();
104128
}
105129
};
106130

107131
resync(shardId, indexShard.routingEntry().allocationId().getId(), indexShard.getPrimaryTerm(), wrappedSnapshot,
108-
startingSeqNo, listener);
132+
startingSeqNo, wrappedListener);
133+
} catch (Exception e) {
134+
wrappedListener.onFailure(e);
109135
}
110136
}
111137

core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import org.elasticsearch.cluster.routing.RoutingTable;
4242
import org.elasticsearch.cluster.routing.ShardRouting;
4343
import org.elasticsearch.cluster.service.ClusterService;
44-
import org.elasticsearch.common.CheckedBiConsumer;
4544
import org.elasticsearch.common.Nullable;
4645
import org.elasticsearch.common.component.AbstractLifecycleComponent;
4746
import org.elasticsearch.common.inject.Inject;
@@ -87,6 +86,7 @@
8786
import java.util.Set;
8887
import java.util.concurrent.ConcurrentMap;
8988
import java.util.concurrent.TimeUnit;
89+
import java.util.function.BiConsumer;
9090
import java.util.function.Consumer;
9191
import java.util.stream.Collectors;
9292
import java.util.stream.Stream;
@@ -746,7 +746,7 @@ public interface Shard {
746746
*/
747747
void updateShardState(ShardRouting shardRouting,
748748
long primaryTerm,
749-
CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer,
749+
BiConsumer<IndexShard, ActionListener<ResyncTask>> primaryReplicaSyncer,
750750
long applyingClusterStateVersion,
751751
Set<String> inSyncAllocationIds,
752752
IndexShardRoutingTable routingTable,

core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.elasticsearch.index.shard;
2020

21+
import org.apache.lucene.store.AlreadyClosedException;
2122
import org.elasticsearch.action.resync.ResyncReplicationResponse;
2223
import org.elasticsearch.action.support.PlainActionFuture;
2324
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
@@ -33,6 +34,7 @@
3334
import java.io.IOException;
3435
import java.nio.ByteBuffer;
3536
import java.util.Collections;
37+
import java.util.concurrent.CountDownLatch;
3638
import java.util.concurrent.atomic.AtomicBoolean;
3739

3840
import static org.hamcrest.Matchers.containsString;
@@ -90,6 +92,49 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception {
9092
closeShards(shard);
9193
}
9294

95+
public void testSyncerOnClosingShard() throws Exception {
96+
IndexShard shard = newStartedShard(true);
97+
AtomicBoolean syncActionCalled = new AtomicBoolean();
98+
CountDownLatch syncCalledLatch = new CountDownLatch(1);
99+
PrimaryReplicaSyncer.SyncAction syncAction =
100+
(request, parentTask, allocationId, primaryTerm, listener) -> {
101+
logger.info("Sending off {} operations", request.getOperations().size());
102+
syncActionCalled.set(true);
103+
syncCalledLatch.countDown();
104+
threadPool.generic().execute(() -> listener.onResponse(new ResyncReplicationResponse()));
105+
};
106+
PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(Settings.EMPTY, new TaskManager(Settings.EMPTY), syncAction);
107+
syncer.setChunkSize(new ByteSizeValue(1)); // every document is sent off separately
108+
109+
int numDocs = 10;
110+
for (int i = 0; i < numDocs; i++) {
111+
indexDoc(shard, "test", Integer.toString(i));
112+
}
113+
114+
String allocationId = shard.routingEntry().allocationId().getId();
115+
shard.updateShardState(shard.routingEntry(), shard.getPrimaryTerm(), null, 1000L, Collections.singleton(allocationId),
116+
new IndexShardRoutingTable.Builder(shard.shardId()).addShard(shard.routingEntry()).build(), Collections.emptySet());
117+
118+
PlainActionFuture<PrimaryReplicaSyncer.ResyncTask> fut = new PlainActionFuture<>();
119+
threadPool.generic().execute(() -> {
120+
try {
121+
syncer.resync(shard, fut);
122+
} catch (AlreadyClosedException ace) {
123+
fut.onFailure(ace);
124+
}
125+
});
126+
if (randomBoolean()) {
127+
syncCalledLatch.await();
128+
}
129+
closeShards(shard);
130+
try {
131+
fut.actionGet();
132+
assertTrue("Sync action was not called", syncActionCalled.get());
133+
} catch (AlreadyClosedException | IndexShardClosedException ignored) {
134+
// ignore
135+
}
136+
}
137+
93138
public void testStatusSerialization() throws IOException {
94139
PrimaryReplicaSyncer.ResyncTask.Status status = new PrimaryReplicaSyncer.ResyncTask.Status(randomAlphaOfLength(10),
95140
randomIntBetween(0, 1000), randomIntBetween(0, 1000), randomIntBetween(0, 1000));

core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
2626
import org.elasticsearch.cluster.routing.RoutingNode;
2727
import org.elasticsearch.cluster.routing.ShardRouting;
28-
import org.elasticsearch.common.CheckedBiConsumer;
2928
import org.elasticsearch.common.Nullable;
3029
import org.elasticsearch.common.settings.Settings;
3130
import org.elasticsearch.common.unit.TimeValue;
@@ -54,8 +53,8 @@
5453
import java.util.Map;
5554
import java.util.Set;
5655
import java.util.concurrent.ConcurrentMap;
56+
import java.util.function.BiConsumer;
5757
import java.util.function.Consumer;
58-
import java.util.stream.Collectors;
5958

6059
import static java.util.Collections.emptyMap;
6160
import static java.util.Collections.unmodifiableMap;
@@ -345,7 +344,7 @@ public RecoveryState recoveryState() {
345344
@Override
346345
public void updateShardState(ShardRouting shardRouting,
347346
long newPrimaryTerm,
348-
CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer,
347+
BiConsumer<IndexShard, ActionListener<ResyncTask>> primaryReplicaSyncer,
349348
long applyingClusterStateVersion,
350349
Set<String> inSyncAllocationIds,
351350
IndexShardRoutingTable routingTable,

0 commit comments

Comments
 (0)