Skip to content

Commit 401c45c

Browse files
authored
Lift retention lease expiration to index shard (#38391)
This commit lifts the control of when retention leases are expired to index shard. In this case, we move expiration to an explicit action rather than a side-effect of calling `ReplicationTracker#getRetentionLeases`. This explicit action is invoked on a timer. If any retention leases expire, then we hard sync the retention leases to the replicas. Otherwise, we proceed with a background sync.
1 parent 99192e7 commit 401c45c

File tree

7 files changed

+192
-212
lines changed

7 files changed

+192
-212
lines changed

server/src/main/java/org/elasticsearch/index/IndexService.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
120120
private volatile AsyncRefreshTask refreshTask;
121121
private volatile AsyncTranslogFSync fsyncTask;
122122
private volatile AsyncGlobalCheckpointTask globalCheckpointTask;
123-
private volatile AsyncRetentionLeaseBackgroundSyncTask retentionLeaseBackgroundSyncTask;
123+
private volatile AsyncRetentionLeaseSyncTask retentionLeaseSyncTask;
124124

125125
// don't convert to Setting<> and register... we only set this in tests and register via a plugin
126126
private final String INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = "index.translog.retention.check_interval";
@@ -197,7 +197,7 @@ public IndexService(
197197
this.refreshTask = new AsyncRefreshTask(this);
198198
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
199199
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
200-
this.retentionLeaseBackgroundSyncTask = new AsyncRetentionLeaseBackgroundSyncTask(this);
200+
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
201201
rescheduleFsyncTask(indexSettings.getTranslogDurability());
202202
}
203203

@@ -288,7 +288,7 @@ public synchronized void close(final String reason, boolean delete) throws IOExc
288288
fsyncTask,
289289
trimTranslogTask,
290290
globalCheckpointTask,
291-
retentionLeaseBackgroundSyncTask);
291+
retentionLeaseSyncTask);
292292
}
293293
}
294294
}
@@ -770,8 +770,8 @@ private void maybeSyncGlobalCheckpoints() {
770770
sync(is -> is.maybeSyncGlobalCheckpoint("background"), "global checkpoint");
771771
}
772772

773-
private void backgroundSyncRetentionLeases() {
774-
sync(IndexShard::backgroundSyncRetentionLeases, "retention lease");
773+
private void syncRetentionLeases() {
774+
sync(IndexShard::syncRetentionLeases, "retention lease");
775775
}
776776

777777
private void sync(final Consumer<IndexShard> sync, final String source) {
@@ -794,11 +794,11 @@ private void sync(final Consumer<IndexShard> sync, final String source) {
794794
&& e instanceof IndexShardClosedException == false) {
795795
logger.warn(
796796
new ParameterizedMessage(
797-
"{} failed to execute background {} sync", shard.shardId(), source), e);
797+
"{} failed to execute {} sync", shard.shardId(), source), e);
798798
}
799799
},
800800
ThreadPool.Names.SAME,
801-
"background " + source + " sync");
801+
source + " sync");
802802
} catch (final AlreadyClosedException | IndexShardClosedException e) {
803803
// the shard was closed concurrently, continue
804804
}
@@ -939,15 +939,15 @@ public String toString() {
939939
}
940940
}
941941

942-
final class AsyncRetentionLeaseBackgroundSyncTask extends BaseAsyncTask {
942+
final class AsyncRetentionLeaseSyncTask extends BaseAsyncTask {
943943

944-
AsyncRetentionLeaseBackgroundSyncTask(final IndexService indexService) {
944+
AsyncRetentionLeaseSyncTask(final IndexService indexService) {
945945
super(indexService, RETENTION_LEASE_SYNC_INTERVAL_SETTING.get(indexService.getIndexSettings().getSettings()));
946946
}
947947

948948
@Override
949949
protected void runInternal() {
950-
indexService.backgroundSyncRetentionLeases();
950+
indexService.syncRetentionLeases();
951951
}
952952

953953
@Override
@@ -957,7 +957,7 @@ protected String getThreadPool() {
957957

958958
@Override
959959
public String toString() {
960-
return "retention_lease_background_sync";
960+
return "retention_lease_sync";
961961
}
962962

963963
}

server/src/main/java/org/elasticsearch/index/IndexSettings.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,10 @@ public long getRetentionLeaseMillis() {
358358
return retentionLeaseMillis;
359359
}
360360

361+
private void setRetentionLeaseMillis(final TimeValue retentionLease) {
362+
this.retentionLeaseMillis = retentionLease.millis();
363+
}
364+
361365
private volatile boolean warmerEnabled;
362366
private volatile int maxResultWindow;
363367
private volatile int maxInnerResultWindow;
@@ -546,6 +550,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
546550
scopedSettings.addSettingsUpdateConsumer(DEFAULT_PIPELINE, this::setDefaultPipeline);
547551
scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, this::setSoftDeleteRetentionOperations);
548552
scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_THROTTLED, this::setSearchThrottled);
553+
scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING, this::setRetentionLeaseMillis);
549554
}
550555

551556
private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) {

server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

Lines changed: 38 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
2929
import org.elasticsearch.cluster.routing.ShardRouting;
3030
import org.elasticsearch.common.SuppressForbidden;
31+
import org.elasticsearch.common.collect.Tuple;
3132
import org.elasticsearch.common.io.stream.StreamInput;
3233
import org.elasticsearch.common.io.stream.StreamOutput;
3334
import org.elasticsearch.common.io.stream.Writeable;
@@ -155,10 +156,10 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
155156
private final LongSupplier currentTimeMillisSupplier;
156157

157158
/**
158-
* A callback when a new retention lease is created or an existing retention lease expires. In practice, this callback invokes the
159-
* retention lease sync action, to sync retention leases to replicas.
159+
* A callback when a new retention lease is created. In practice, this callback invokes the retention lease sync action, to sync
160+
* retention leases to replicas.
160161
*/
161-
private final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases;
162+
private final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onAddRetentionLease;
162163

163164
/**
164165
* This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the
@@ -177,43 +178,42 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
177178
private RetentionLeases retentionLeases = RetentionLeases.EMPTY;
178179

179180
/**
180-
* Get all non-expired retention leases tracked on this shard. Note that only the primary shard calculates which leases are expired,
181-
* and if any have expired, syncs the retention leases to any replicas.
181+
* Get all retention leases tracked on this shard.
182182
*
183183
* @return the retention leases
184184
*/
185185
public RetentionLeases getRetentionLeases() {
186-
final boolean wasPrimaryMode;
187-
final RetentionLeases nonExpiredRetentionLeases;
188-
synchronized (this) {
189-
if (primaryMode) {
190-
// the primary calculates the non-expired retention leases and syncs them to replicas
191-
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
192-
final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis();
193-
final Map<Boolean, List<RetentionLease>> partitionByExpiration = retentionLeases
194-
.leases()
195-
.stream()
196-
.collect(Collectors.groupingBy(lease -> currentTimeMillis - lease.timestamp() > retentionLeaseMillis));
197-
if (partitionByExpiration.get(true) == null) {
198-
// early out as no retention leases have expired
199-
return retentionLeases;
200-
}
201-
final Collection<RetentionLease> nonExpiredLeases =
202-
partitionByExpiration.get(false) != null ? partitionByExpiration.get(false) : Collections.emptyList();
203-
retentionLeases = new RetentionLeases(operationPrimaryTerm, retentionLeases.version() + 1, nonExpiredLeases);
204-
}
205-
/*
206-
* At this point, we were either in primary mode and have updated the non-expired retention leases into the tracking map, or
207-
* we were in replica mode and merely need to copy the existing retention leases since a replica does not calculate the
208-
* non-expired retention leases, instead receiving them on syncs from the primary.
209-
*/
210-
wasPrimaryMode = primaryMode;
211-
nonExpiredRetentionLeases = retentionLeases;
186+
return getRetentionLeases(false).v2();
187+
}
188+
189+
/**
190+
* If the expire leases parameter is false, gets all retention leases tracked on this shard and otherwise first calculates
191+
* expiration of existing retention leases, and then gets all non-expired retention leases tracked on this shard. Note that only the
192+
* primary shard calculates which leases are expired, and if any have expired, syncs the retention leases to any replicas. If the
193+
* expire leases parameter is true, this replication tracker must be in primary mode.
194+
*
195+
* @return a tuple indicating whether or not any retention leases were expired, and the non-expired retention leases
196+
*/
197+
public synchronized Tuple<Boolean, RetentionLeases> getRetentionLeases(final boolean expireLeases) {
198+
if (expireLeases == false) {
199+
return Tuple.tuple(false, retentionLeases);
212200
}
213-
if (wasPrimaryMode) {
214-
onSyncRetentionLeases.accept(nonExpiredRetentionLeases, ActionListener.wrap(() -> {}));
201+
assert primaryMode;
202+
// the primary calculates the non-expired retention leases and syncs them to replicas
203+
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
204+
final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis();
205+
final Map<Boolean, List<RetentionLease>> partitionByExpiration = retentionLeases
206+
.leases()
207+
.stream()
208+
.collect(Collectors.groupingBy(lease -> currentTimeMillis - lease.timestamp() > retentionLeaseMillis));
209+
if (partitionByExpiration.get(true) == null) {
210+
// early out as no retention leases have expired
211+
return Tuple.tuple(false, retentionLeases);
215212
}
216-
return nonExpiredRetentionLeases;
213+
final Collection<RetentionLease> nonExpiredLeases =
214+
partitionByExpiration.get(false) != null ? partitionByExpiration.get(false) : Collections.emptyList();
215+
retentionLeases = new RetentionLeases(operationPrimaryTerm, retentionLeases.version() + 1, nonExpiredLeases);
216+
return Tuple.tuple(true, retentionLeases);
217217
}
218218

219219
/**
@@ -246,7 +246,7 @@ public RetentionLease addRetentionLease(
246246
Stream.concat(retentionLeases.leases().stream(), Stream.of(retentionLease)).collect(Collectors.toList()));
247247
currentRetentionLeases = retentionLeases;
248248
}
249-
onSyncRetentionLeases.accept(currentRetentionLeases, listener);
249+
onAddRetentionLease.accept(currentRetentionLeases, listener);
250250
return retentionLease;
251251
}
252252

@@ -563,7 +563,7 @@ private static long inSyncCheckpointStates(
563563
* @param indexSettings the index settings
564564
* @param operationPrimaryTerm the current primary term
565565
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
566-
* @param onSyncRetentionLeases a callback when a new retention lease is created or an existing retention lease expires
566+
* @param onAddRetentionLease a callback when a new retention lease is created or an existing retention lease expires
567567
*/
568568
public ReplicationTracker(
569569
final ShardId shardId,
@@ -573,7 +573,7 @@ public ReplicationTracker(
573573
final long globalCheckpoint,
574574
final LongConsumer onGlobalCheckpointUpdated,
575575
final LongSupplier currentTimeMillisSupplier,
576-
final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases) {
576+
final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onAddRetentionLease) {
577577
super(shardId, indexSettings);
578578
assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
579579
this.shardAllocationId = allocationId;
@@ -585,7 +585,7 @@ public ReplicationTracker(
585585
checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false));
586586
this.onGlobalCheckpointUpdated = Objects.requireNonNull(onGlobalCheckpointUpdated);
587587
this.currentTimeMillisSupplier = Objects.requireNonNull(currentTimeMillisSupplier);
588-
this.onSyncRetentionLeases = Objects.requireNonNull(onSyncRetentionLeases);
588+
this.onAddRetentionLease = Objects.requireNonNull(onAddRetentionLease);
589589
this.pendingInSync = new HashSet<>();
590590
this.routingTable = null;
591591
this.replicationGroup = null;

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1929,13 +1929,26 @@ public void addGlobalCheckpointListener(
19291929
}
19301930

19311931
/**
1932-
* Get all non-expired retention leases tracked on this shard.
1932+
* Get all retention leases tracked on this shard.
19331933
*
19341934
* @return the retention leases
19351935
*/
19361936
public RetentionLeases getRetentionLeases() {
1937+
return getRetentionLeases(false).v2();
1938+
}
1939+
1940+
/**
1941+
* If the expire leases parameter is false, gets all retention leases tracked on this shard and otherwise first calculates
1942+
* expiration of existing retention leases, and then gets all non-expired retention leases tracked on this shard. Note that only the
1943+
* primary shard calculates which leases are expired, and if any have expired, syncs the retention leases to any replicas. If the
1944+
* expire leases parameter is true, this replication tracker must be in primary mode.
1945+
*
1946+
* @return a tuple indicating whether or not any retention leases were expired, and the non-expired retention leases
1947+
*/
1948+
public Tuple<Boolean, RetentionLeases> getRetentionLeases(final boolean expireLeases) {
1949+
assert expireLeases == false || assertPrimaryMode();
19371950
verifyNotClosed();
1938-
return replicationTracker.getRetentionLeases();
1951+
return replicationTracker.getRetentionLeases(expireLeases);
19391952
}
19401953

19411954
public RetentionLeaseStats getRetentionLeaseStats() {
@@ -1993,10 +2006,15 @@ public void updateRetentionLeasesOnReplica(final RetentionLeases retentionLeases
19932006
/**
19942007
* Syncs the current retention leases to all replicas.
19952008
*/
1996-
public void backgroundSyncRetentionLeases() {
2009+
public void syncRetentionLeases() {
19972010
assert assertPrimaryMode();
19982011
verifyNotClosed();
1999-
retentionLeaseSyncer.backgroundSync(shardId, getRetentionLeases());
2012+
final Tuple<Boolean, RetentionLeases> retentionLeases = getRetentionLeases(true);
2013+
if (retentionLeases.v1()) {
2014+
retentionLeaseSyncer.sync(shardId, retentionLeases.v2(), ActionListener.wrap(() -> {}));
2015+
} else {
2016+
retentionLeaseSyncer.backgroundSync(shardId, retentionLeases.v2());
2017+
}
20002018
}
20012019

20022020
/**

0 commit comments

Comments
 (0)