2222import org .apache .logging .log4j .Logger ;
2323import org .apache .logging .log4j .message .ParameterizedMessage ;
2424import org .apache .lucene .document .Field ;
25- import org .apache .lucene .document .LongPoint ;
2625import org .apache .lucene .document .NumericDocValuesField ;
2726import org .apache .lucene .index .DirectoryReader ;
2827import org .apache .lucene .index .IndexCommit ;
3837import org .apache .lucene .index .SoftDeletesRetentionMergePolicy ;
3938import org .apache .lucene .index .Term ;
4039import org .apache .lucene .search .IndexSearcher ;
41- import org .apache .lucene .search .Query ;
4240import org .apache .lucene .search .ReferenceManager ;
4341import org .apache .lucene .search .SearcherFactory ;
4442import org .apache .lucene .search .SearcherManager ;
@@ -153,6 +151,7 @@ public class InternalEngine extends Engine {
153151 private final CounterMetric numDocUpdates = new CounterMetric ();
154152 private final NumericDocValuesField softDeleteField = Lucene .newSoftDeleteField ();
155153 private final boolean softDeleteEnabled ;
154+ private final SoftDeletesPolicy softDeletesPolicy ;
156155 private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener ;
157156
158157 /**
@@ -177,7 +176,6 @@ public InternalEngine(EngineConfig engineConfig) {
177176 if (engineConfig .isAutoGeneratedIDsOptimizationEnabled () == false ) {
178177 maxUnsafeAutoIdTimestamp .set (Long .MAX_VALUE );
179178 }
180- this .softDeleteEnabled = engineConfig .getIndexSettings ().isSoftDeleteEnabled ();
181179 final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy (
182180 engineConfig .getIndexSettings ().getTranslogRetentionSize ().getBytes (),
183181 engineConfig .getIndexSettings ().getTranslogRetentionAge ().getMillis ()
@@ -199,8 +197,10 @@ public InternalEngine(EngineConfig engineConfig) {
199197 assert translog .getGeneration () != null ;
200198 this .translog = translog ;
201199 this .localCheckpointTracker = createLocalCheckpointTracker (localCheckpointTrackerSupplier );
200+ this .softDeleteEnabled = engineConfig .getIndexSettings ().isSoftDeleteEnabled ();
201+ this .softDeletesPolicy = newSoftDeletesPolicy ();
202202 this .combinedDeletionPolicy =
203- new CombinedDeletionPolicy (logger , translogDeletionPolicy , translog ::getLastSyncedGlobalCheckpoint );
203+ new CombinedDeletionPolicy (logger , translogDeletionPolicy , softDeletesPolicy , translog ::getLastSyncedGlobalCheckpoint );
204204 writer = createWriter ();
205205 bootstrapAppendOnlyInfoFromWriter (writer );
206206 historyUUID = loadHistoryUUID (writer );
@@ -257,6 +257,18 @@ private LocalCheckpointTracker createLocalCheckpointTracker(
257257 return localCheckpointTrackerSupplier .apply (maxSeqNo , localCheckpoint );
258258 }
259259
260+ private SoftDeletesPolicy newSoftDeletesPolicy () throws IOException {
261+ final Map <String , String > commitUserData = store .readLastCommittedSegmentsInfo ().userData ;
262+ final long lastMinRetainedSeqNo ;
263+ if (commitUserData .containsKey (Engine .MIN_RETAINED_SEQNO )) {
264+ lastMinRetainedSeqNo = Long .parseLong (commitUserData .get (Engine .MIN_RETAINED_SEQNO ));
265+ } else {
266+ lastMinRetainedSeqNo = Long .parseLong (commitUserData .get (SequenceNumbers .MAX_SEQ_NO )) + 1 ;
267+ }
268+ return new SoftDeletesPolicy (translog ::getLastSyncedGlobalCheckpoint , lastMinRetainedSeqNo ,
269+ engineConfig .getIndexSettings ().getSoftDeleteRetentionOperations ());
270+ }
271+
260272 /**
261273 * This reference manager delegates all it's refresh calls to another (internal) SearcherManager
262274 * The main purpose for this is that if we have external refreshes happening we don't issue extra
@@ -468,18 +480,39 @@ public void syncTranslog() throws IOException {
468480 }
469481
470482 @ Override
471- public Closeable acquireTranslogRetentionLock () {
472- return getTranslog ().acquireRetentionLock ( );
483+ public Translog . Snapshot newTranslogSnapshotBetween ( long minSeqNo , long maxSeqNo ) throws IOException {
484+ return getTranslog ().getSnapshotBetween ( minSeqNo , maxSeqNo );
473485 }
474486
487+ /**
488+ * Creates a new history snapshot for reading operations since the provided seqno.
489+ * The returned snapshot can be retrieved from either Lucene index or translog files.
490+ */
475491 @ Override
476- public Translog .Snapshot newTranslogSnapshotBetween (long minSeqNo , long maxSeqNo ) throws IOException {
477- return getTranslog ().getSnapshotBetween (minSeqNo , maxSeqNo );
492+ public Translog .Snapshot readHistoryOperations (String source , MapperService mapperService , long startingSeqNo ) throws IOException {
493+ if (engineConfig .getIndexSettings ().isSoftDeleteEnabled ()) {
494+ return newLuceneChangesSnapshot (source , mapperService , Math .max (0 , startingSeqNo ), Long .MAX_VALUE , false );
495+ } else {
496+ return getTranslog ().getSnapshotBetween (startingSeqNo , Long .MAX_VALUE );
497+ }
478498 }
479499
500+ /**
501+ * Returns the estimated number of history operations whose seq# at least the provided seq# in this engine.
502+ */
480503 @ Override
481- public int estimateTranslogOperationsFromMinSeq (long minSeqNo ) {
482- return getTranslog ().estimateTotalOperationsFromMinSeq (minSeqNo );
504+ public int estimateNumberOfHistoryOperations (String source , MapperService mapperService , long startingSeqNo ) throws IOException {
505+ if (engineConfig .getIndexSettings ().isSoftDeleteEnabled ()) {
506+ try (Translog .Snapshot snapshot =
507+ newLuceneChangesSnapshot (source , mapperService , Math .max (0 , startingSeqNo ), Long .MAX_VALUE , false )) {
508+ return snapshot .totalOperations ();
509+ } catch (IOException ex ) {
510+ maybeFailEngine (source , ex );
511+ throw ex ;
512+ }
513+ } else {
514+ return getTranslog ().estimateTotalOperationsFromMinSeq (startingSeqNo );
515+ }
483516 }
484517
485518 @ Override
@@ -2070,8 +2103,8 @@ private IndexWriterConfig getIndexWriterConfig() {
20702103 MergePolicy mergePolicy = config ().getMergePolicy ();
20712104 if (softDeleteEnabled ) {
20722105 iwc .setSoftDeletesField (Lucene .SOFT_DELETE_FIELD );
2073- mergePolicy = new RecoverySourcePruneMergePolicy (SourceFieldMapper .RECOVERY_SOURCE_NAME , this :: softDeletesRetentionQuery ,
2074- new SoftDeletesRetentionMergePolicy (Lucene .SOFT_DELETE_FIELD , this :: softDeletesRetentionQuery , mergePolicy ));
2106+ mergePolicy = new RecoverySourcePruneMergePolicy (SourceFieldMapper .RECOVERY_SOURCE_NAME , softDeletesPolicy :: getRetentionQuery ,
2107+ new SoftDeletesRetentionMergePolicy (Lucene .SOFT_DELETE_FIELD , softDeletesPolicy :: getRetentionQuery , mergePolicy ));
20752108 }
20762109 iwc .setMergePolicy (new ElasticsearchMergePolicy (mergePolicy ));
20772110 iwc .setSimilarity (engineConfig .getSimilarity ());
@@ -2084,20 +2117,6 @@ private IndexWriterConfig getIndexWriterConfig() {
20842117 return iwc ;
20852118 }
20862119
2087- /**
2088- * Documents including tombstones are soft-deleted and matched this query will be retained and won't cleaned up by merges.
2089- */
2090- private Query softDeletesRetentionQuery () {
2091- ensureOpen ();
2092- // TODO: We send the safe commit in peer-recovery, thus we need to retain all operations after the local checkpoint of that commit.
2093- final long retainedExtraOps = engineConfig .getIndexSettings ().getSoftDeleteRetentionOperations ();
2094- // Prefer using the global checkpoint which is persisted on disk than an in-memory value.
2095- // If we failed to fsync checkpoint but already used a higher global checkpoint value to clean up soft-deleted ops,
2096- // then we may not have all required operations whose seq# greater than the global checkpoint after restarted.
2097- final long persistedGlobalCheckpoint = translog .getLastSyncedGlobalCheckpoint ();
2098- return LongPoint .newRangeQuery (SeqNoFieldMapper .NAME , persistedGlobalCheckpoint + 1 - retainedExtraOps , Long .MAX_VALUE );
2099- }
2100-
21012120 /** Extended SearcherFactory that warms the segments if needed when acquiring a new searcher */
21022121 static final class SearchFactory extends EngineSearcherFactory {
21032122 private final Engine .Warmer warmer ;
@@ -2284,6 +2303,9 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
22842303 commitData .put (SequenceNumbers .MAX_SEQ_NO , Long .toString (localCheckpointTracker .getMaxSeqNo ()));
22852304 commitData .put (MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID , Long .toString (maxUnsafeAutoIdTimestamp .get ()));
22862305 commitData .put (HISTORY_UUID_KEY , historyUUID );
2306+ if (softDeleteEnabled ) {
2307+ commitData .put (Engine .MIN_RETAINED_SEQNO , Long .toString (softDeletesPolicy .getMinRetainedSeqNo ()));
2308+ }
22872309 logger .trace ("committing writer with commit data [{}]" , commitData );
22882310 return commitData .entrySet ().iterator ();
22892311 });
@@ -2339,6 +2361,8 @@ public void onSettingsChanged() {
23392361 final IndexSettings indexSettings = engineConfig .getIndexSettings ();
23402362 translogDeletionPolicy .setRetentionAgeInMillis (indexSettings .getTranslogRetentionAge ().getMillis ());
23412363 translogDeletionPolicy .setRetentionSizeInBytes (indexSettings .getTranslogRetentionSize ().getBytes ());
2364+
2365+ softDeletesPolicy .setRetentionOperations (indexSettings .getSoftDeleteRetentionOperations ());
23422366 }
23432367
23442368 public MergeStats getMergeStats () {
@@ -2452,6 +2476,41 @@ public Translog.Snapshot newLuceneChangesSnapshot(String source, MapperService m
24522476 }
24532477 }
24542478
2479+ @ Override
2480+ public boolean hasCompleteOperationHistory (String source , MapperService mapperService , long startingSeqNo ) throws IOException {
2481+ if (engineConfig .getIndexSettings ().isSoftDeleteEnabled ()) {
2482+ return getMinRetainedSeqNo () <= startingSeqNo ;
2483+ } else {
2484+ final long currentLocalCheckpoint = getLocalCheckpointTracker ().getCheckpoint ();
2485+ final LocalCheckpointTracker tracker = new LocalCheckpointTracker (startingSeqNo , startingSeqNo - 1 );
2486+ try (Translog .Snapshot snapshot = getTranslog ().getSnapshotBetween (startingSeqNo , Long .MAX_VALUE )) {
2487+ Translog .Operation operation ;
2488+ while ((operation = snapshot .next ()) != null ) {
2489+ if (operation .seqNo () != SequenceNumbers .UNASSIGNED_SEQ_NO ) {
2490+ tracker .markSeqNoAsCompleted (operation .seqNo ());
2491+ }
2492+ }
2493+ }
2494+ return tracker .getCheckpoint () >= currentLocalCheckpoint ;
2495+ }
2496+ }
2497+
2498+ /**
2499+ * Returns the minimum seqno that is retained in the Lucene index.
2500+ * Operations whose seq# are at least this value should exist in the Lucene index.
2501+ */
2502+ final long getMinRetainedSeqNo () {
2503+ assert softDeleteEnabled : Thread .currentThread ().getName ();
2504+ return softDeletesPolicy .getMinRetainedSeqNo ();
2505+ }
2506+
2507+ @ Override
2508+ public Closeable acquireRetentionLockForPeerRecovery () {
2509+ final Closeable translogLock = translog .acquireRetentionLock ();
2510+ final Releasable softDeletesLock = softDeletesPolicy .acquireRetentionLock ();
2511+ return () -> IOUtils .close (translogLock , softDeletesLock );
2512+ }
2513+
24552514 @ Override
24562515 public boolean isRecovering () {
24572516 return pendingTranslogRecovery .get ();
0 commit comments