Skip to content

Commit 6baf93d

Browse files
committed
Ignore waitForActiveShards when syncing leases (#39224)
Adjust the retention lease sync actions so that they do not respect the `index.write.wait_for_active_shards` setting on an index, allowing them to sync retention leases even if insufficiently many shards are currently active to accept writes. Relates #39089
1 parent fb7d9a0 commit 6baf93d

File tree

3 files changed

+120
-5
lines changed

3 files changed

+120
-5
lines changed

server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.ExceptionsHelper;
2727
import org.elasticsearch.action.ActionListener;
2828
import org.elasticsearch.action.support.ActionFilters;
29+
import org.elasticsearch.action.support.ActiveShardCount;
2930
import org.elasticsearch.action.support.replication.ReplicationRequest;
3031
import org.elasticsearch.action.support.replication.ReplicationResponse;
3132
import org.elasticsearch.action.support.replication.TransportReplicationAction;
@@ -123,6 +124,7 @@ public void backgroundSync(
123124
protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(
124125
final Request request,
125126
final IndexShard primary) throws WriteStateException {
127+
assert request.waitForActiveShards().equals(ActiveShardCount.NONE) : request.waitForActiveShards();
126128
Objects.requireNonNull(request);
127129
Objects.requireNonNull(primary);
128130
primary.persistRetentionLeases();
@@ -153,6 +155,7 @@ public Request() {
153155
public Request(final ShardId shardId, final RetentionLeases retentionLeases) {
154156
super(Objects.requireNonNull(shardId));
155157
this.retentionLeases = Objects.requireNonNull(retentionLeases);
158+
waitForActiveShards(ActiveShardCount.NONE);
156159
}
157160

158161
@Override

server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.ExceptionsHelper;
2727
import org.elasticsearch.action.ActionListener;
2828
import org.elasticsearch.action.support.ActionFilters;
29+
import org.elasticsearch.action.support.ActiveShardCount;
2930
import org.elasticsearch.action.support.WriteResponse;
3031
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
3132
import org.elasticsearch.action.support.replication.ReplicationResponse;
@@ -125,6 +126,7 @@ public void sync(
125126
protected WritePrimaryResult<Request, Response> shardOperationOnPrimary(
126127
final Request request,
127128
final IndexShard primary) throws WriteStateException {
129+
assert request.waitForActiveShards().equals(ActiveShardCount.NONE) : request.waitForActiveShards();
128130
Objects.requireNonNull(request);
129131
Objects.requireNonNull(primary);
130132
primary.persistRetentionLeases();
@@ -162,6 +164,7 @@ public Request() {
162164
public Request(final ShardId shardId, final RetentionLeases retentionLeases) {
163165
super(Objects.requireNonNull(shardId));
164166
this.retentionLeases = Objects.requireNonNull(retentionLeases);
167+
waitForActiveShards(ActiveShardCount.NONE);
165168
}
166169

167170
@Override

server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java

Lines changed: 114 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ public void testCanRemoveRetentionLeasesUnderBlock() throws InterruptedException
428428
private void runUnderBlockTest(
429429
final String idForInitialRetentionLease,
430430
final long initialRetainingSequenceNumber,
431-
final BiConsumer<IndexShard, ActionListener<ReplicationResponse>> indexShard,
431+
final BiConsumer<IndexShard, ActionListener<ReplicationResponse>> primaryConsumer,
432432
final Consumer<IndexShard> afterSync) throws InterruptedException {
433433
final Settings settings = Settings.builder()
434434
.put("index.number_of_shards", 1)
@@ -444,12 +444,10 @@ private void runUnderBlockTest(
444444
.getInstance(IndicesService.class, primaryShardNodeName)
445445
.getShardOrNull(new ShardId(resolveIndex("index"), 0));
446446

447-
final String id = idForInitialRetentionLease;
448-
final long retainingSequenceNumber = initialRetainingSequenceNumber;
449447
final String source = randomAlphaOfLength(8);
450448
final CountDownLatch latch = new CountDownLatch(1);
451449
final ActionListener<ReplicationResponse> listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString()));
452-
primary.addRetentionLease(id, retainingSequenceNumber, source, listener);
450+
primary.addRetentionLease(idForInitialRetentionLease, initialRetainingSequenceNumber, source, listener);
453451
latch.await();
454452

455453
final String block = randomFrom("read_only", "read_only_allow_delete", "read", "write", "metadata");
@@ -465,7 +463,7 @@ private void runUnderBlockTest(
465463
final CountDownLatch actionLatch = new CountDownLatch(1);
466464
final AtomicBoolean success = new AtomicBoolean();
467465

468-
indexShard.accept(
466+
primaryConsumer.accept(
469467
primary,
470468
new ActionListener<ReplicationResponse>() {
471469

@@ -494,4 +492,115 @@ public void onFailure(final Exception e) {
494492
}
495493
}
496494

495+
public void testCanAddRetentionLeaseWithoutWaitingForShards() throws InterruptedException {
496+
final String idForInitialRetentionLease = randomAlphaOfLength(8);
497+
runWaitForShardsTest(
498+
idForInitialRetentionLease,
499+
randomLongBetween(0, Long.MAX_VALUE),
500+
(primary, listener) -> {
501+
final String nextId = randomValueOtherThan(idForInitialRetentionLease, () -> randomAlphaOfLength(8));
502+
final long nextRetainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE);
503+
final String nextSource = randomAlphaOfLength(8);
504+
primary.addRetentionLease(nextId, nextRetainingSequenceNumber, nextSource, listener);
505+
},
506+
primary -> {});
507+
}
508+
509+
public void testCanRenewRetentionLeaseWithoutWaitingForShards() throws InterruptedException {
510+
final String idForInitialRetentionLease = randomAlphaOfLength(8);
511+
final long initialRetainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE);
512+
final AtomicReference<RetentionLease> retentionLease = new AtomicReference<>();
513+
runWaitForShardsTest(
514+
idForInitialRetentionLease,
515+
initialRetainingSequenceNumber,
516+
(primary, listener) -> {
517+
final long nextRetainingSequenceNumber = randomLongBetween(initialRetainingSequenceNumber, Long.MAX_VALUE);
518+
final String nextSource = randomAlphaOfLength(8);
519+
retentionLease.set(primary.renewRetentionLease(idForInitialRetentionLease, nextRetainingSequenceNumber, nextSource));
520+
listener.onResponse(new ReplicationResponse());
521+
},
522+
primary -> {
523+
try {
524+
/*
525+
* If the background renew was able to execute, then the retention leases were persisted to disk. There is no other
526+
* way for the current retention leases to end up written to disk so we assume that if they are written to disk, it
527+
* implies that the background sync was able to execute despite wait for shards being set on the index.
528+
*/
529+
assertBusy(() -> assertThat(primary.loadRetentionLeases().leases(), contains(retentionLease.get())));
530+
} catch (final Exception e) {
531+
fail(e.toString());
532+
}
533+
});
534+
535+
}
536+
537+
public void testCanRemoveRetentionLeasesWithoutWaitingForShards() throws InterruptedException {
538+
final String idForInitialRetentionLease = randomAlphaOfLength(8);
539+
runWaitForShardsTest(
540+
idForInitialRetentionLease,
541+
randomLongBetween(0, Long.MAX_VALUE),
542+
(primary, listener) -> primary.removeRetentionLease(idForInitialRetentionLease, listener),
543+
primary -> {});
544+
}
545+
546+
private void runWaitForShardsTest(
547+
final String idForInitialRetentionLease,
548+
final long initialRetainingSequenceNumber,
549+
final BiConsumer<IndexShard, ActionListener<ReplicationResponse>> primaryConsumer,
550+
final Consumer<IndexShard> afterSync) throws InterruptedException {
551+
final int numDataNodes = internalCluster().numDataNodes();
552+
final Settings settings = Settings.builder()
553+
.put("index.number_of_shards", 1)
554+
.put("index.number_of_replicas", numDataNodes == 1 ? 0 : numDataNodes - 1)
555+
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(1))
556+
.build();
557+
assertAcked(prepareCreate("index").setSettings(settings));
558+
ensureYellowAndNoInitializingShards("index");
559+
assertFalse(client().admin().cluster().prepareHealth("index").setWaitForActiveShards(numDataNodes).get().isTimedOut());
560+
561+
final String primaryShardNodeId = clusterService().state().routingTable().index("index").shard(0).primaryShard().currentNodeId();
562+
final String primaryShardNodeName = clusterService().state().nodes().get(primaryShardNodeId).getName();
563+
final IndexShard primary = internalCluster()
564+
.getInstance(IndicesService.class, primaryShardNodeName)
565+
.getShardOrNull(new ShardId(resolveIndex("index"), 0));
566+
567+
final String source = randomAlphaOfLength(8);
568+
final CountDownLatch latch = new CountDownLatch(1);
569+
final ActionListener<ReplicationResponse> listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString()));
570+
primary.addRetentionLease(idForInitialRetentionLease, initialRetainingSequenceNumber, source, listener);
571+
latch.await();
572+
573+
final String waitForActiveValue = randomBoolean() ? "all" : Integer.toString(numDataNodes);
574+
575+
client()
576+
.admin()
577+
.indices()
578+
.prepareUpdateSettings("index")
579+
.setSettings(Settings.builder().put("index.write.wait_for_active_shards", waitForActiveValue).build())
580+
.get();
581+
582+
final CountDownLatch actionLatch = new CountDownLatch(1);
583+
final AtomicBoolean success = new AtomicBoolean();
584+
585+
primaryConsumer.accept(
586+
primary,
587+
new ActionListener<ReplicationResponse>() {
588+
589+
@Override
590+
public void onResponse(final ReplicationResponse replicationResponse) {
591+
success.set(true);
592+
actionLatch.countDown();
593+
}
594+
595+
@Override
596+
public void onFailure(final Exception e) {
597+
fail(e.toString());
598+
}
599+
600+
});
601+
actionLatch.await();
602+
assertTrue(success.get());
603+
afterSync.accept(primary);
604+
}
605+
497606
}

0 commit comments

Comments
 (0)