Skip to content

Commit 8a29eb1

Browse files
author
Sachin Kale
committed
Integrate translog cleanup with snapshot deletion and fix primary term deletion logic
Signed-off-by: Sachin Kale <[email protected]>
1 parent e087272 commit 8a29eb1

File tree

6 files changed

+309
-39
lines changed

6 files changed

+309
-39
lines changed

server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,4 +116,8 @@ public Directory newDirectory(String repositoryName, String indexUUID, ShardId s
116116
}
117117
}
118118

119+
public Supplier<RepositoriesService> getRepositoriesService() {
120+
return this.repositoriesService;
121+
}
122+
119123
}

server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java

Lines changed: 174 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88

99
package org.opensearch.index.translog;
1010

11+
import org.apache.logging.log4j.LogManager;
1112
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.action.LatchedActionListener;
1214
import org.opensearch.cluster.service.ClusterService;
1315
import org.opensearch.common.blobstore.BlobMetadata;
1416
import org.opensearch.common.collect.Tuple;
@@ -33,6 +35,9 @@
3335
import java.util.Optional;
3436
import java.util.Set;
3537
import java.util.TreeSet;
38+
import java.util.concurrent.CountDownLatch;
39+
import java.util.concurrent.TimeUnit;
40+
import java.util.concurrent.atomic.AtomicLong;
3641
import java.util.function.BooleanSupplier;
3742
import java.util.function.LongConsumer;
3843
import java.util.function.LongSupplier;
@@ -52,10 +57,13 @@
5257
*/
5358
public class RemoteFsTimestampAwareTranslog extends RemoteFsTranslog {
5459

60+
private static Logger staticLogger = LogManager.getLogger(RemoteFsTimestampAwareTranslog.class);
5561
private final Logger logger;
5662
private final Map<Long, String> metadataFilePinnedTimestampMap;
5763
// For metadata files, with no min generation in the name, we cache generation data to avoid multiple reads.
5864
private final Map<String, Tuple<Long, Long>> oldFormatMetadataFileGenerationMap;
65+
private final Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap;
66+
private final AtomicLong minPrimaryTermInRemote = new AtomicLong(Long.MAX_VALUE);
5967

6068
public RemoteFsTimestampAwareTranslog(
6169
TranslogConfig config,
@@ -86,6 +94,7 @@ public RemoteFsTimestampAwareTranslog(
8694
logger = Loggers.getLogger(getClass(), shardId);
8795
this.metadataFilePinnedTimestampMap = new HashMap<>();
8896
this.oldFormatMetadataFileGenerationMap = new HashMap<>();
97+
this.oldFormatMetadataFilePrimaryTermMap = new HashMap<>();
8998
}
9099

91100
@Override
@@ -165,7 +174,11 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
165174
return;
166175
}
167176

168-
List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles);
177+
List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(
178+
metadataFiles,
179+
metadataFilePinnedTimestampMap,
180+
logger
181+
);
169182

170183
// If index is not deleted, make sure to keep latest metadata file
171184
if (indexDeleted == false) {
@@ -209,7 +222,7 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
209222
oldFormatMetadataFileGenerationMap.keySet().retainAll(metadataFilesNotToBeDeleted);
210223

211224
// Delete stale primary terms
212-
deleteStaleRemotePrimaryTerms(metadataFiles);
225+
deleteStaleRemotePrimaryTerms(metadataFilesNotToBeDeleted);
213226
} else {
214227
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
215228
}
@@ -259,8 +272,16 @@ protected Set<Long> getGenerationsToBeDeleted(
259272
return generationsToBeDeleted;
260273
}
261274

262-
// Visible for testing
263275
protected List<String> getMetadataFilesToBeDeleted(List<String> metadataFiles) {
276+
return getMetadataFilesToBeDeleted(metadataFiles, metadataFilePinnedTimestampMap, logger);
277+
}
278+
279+
// Visible for testing
280+
protected static List<String> getMetadataFilesToBeDeleted(
281+
List<String> metadataFiles,
282+
Map<Long, String> metadataFilePinnedTimestampMap,
283+
Logger logger
284+
) {
264285
Tuple<Long, Set<Long>> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps();
265286

266287
// Keep files since last successful run of scheduler
@@ -351,27 +372,167 @@ protected Tuple<Long, Long> getMinMaxTranslogGenerationFromMetadataFile(
351372
}
352373
}
353374

375+
private void deleteStaleRemotePrimaryTerms(List<String> metadataFiles) {
376+
deleteStaleRemotePrimaryTerms(
377+
metadataFiles,
378+
translogTransferManager,
379+
oldFormatMetadataFilePrimaryTermMap,
380+
minPrimaryTermInRemote,
381+
logger
382+
);
383+
}
384+
354385
/**
355386
* This method must be called only after there are valid generations to delete in trimUnreferencedReaders as it ensures
356387
* implicitly that minimum primary term in latest translog metadata in remote store is the current primary term.
357388
* <br>
358389
* This will also delete all stale translog metadata files from remote except the latest basis the metadata file comparator.
359390
*/
360-
private void deleteStaleRemotePrimaryTerms(List<String> metadataFiles) {
391+
private static void deleteStaleRemotePrimaryTerms(
392+
List<String> metadataFiles,
393+
TranslogTransferManager translogTransferManager,
394+
Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap,
395+
AtomicLong minPrimaryTermInRemote,
396+
Logger logger
397+
) {
361398
// The deletion of older translog files in remote store is on best-effort basis, there is a possibility that there
362399
// are older files that are no longer needed and should be cleaned up. In here, we delete all files that are part
363400
// of older primary term.
364-
if (olderPrimaryCleaned.trySet(Boolean.TRUE)) {
365-
if (metadataFiles.isEmpty()) {
366-
logger.trace("No metadata is uploaded yet, returning from deleteStaleRemotePrimaryTerms");
367-
return;
401+
if (metadataFiles.isEmpty()) {
402+
logger.trace("No metadata is uploaded yet, returning from deleteStaleRemotePrimaryTerms");
403+
return;
404+
}
405+
Optional<Long> minPrimaryTermFromMetadataFiles = metadataFiles.stream().map(file -> {
406+
try {
407+
return getMinMaxPrimaryTermFromMetadataFile(file, translogTransferManager, oldFormatMetadataFilePrimaryTermMap).v1();
408+
} catch (IOException e) {
409+
return Long.MAX_VALUE;
368410
}
369-
Optional<Long> minPrimaryTerm = metadataFiles.stream()
370-
.map(file -> RemoteStoreUtils.invertLong(file.split(METADATA_SEPARATOR)[1]))
371-
.min(Long::compareTo);
372-
// First we delete all stale primary terms folders from remote store
373-
long minimumReferencedPrimaryTerm = minPrimaryTerm.get() - 1;
411+
}).min(Long::compareTo);
412+
// First we delete all stale primary terms folders from remote store
413+
long minimumReferencedPrimaryTerm = minPrimaryTermFromMetadataFiles.get() - 1;
414+
Long minPrimaryTerm = getMinPrimaryTermInRemote(minPrimaryTermInRemote, translogTransferManager, logger);
415+
if (minimumReferencedPrimaryTerm > minPrimaryTerm) {
374416
translogTransferManager.deletePrimaryTermsAsync(minimumReferencedPrimaryTerm);
417+
minPrimaryTermInRemote.set(minimumReferencedPrimaryTerm);
418+
} else {
419+
logger.debug(
420+
"Skipping primary term cleanup. minimumReferencedPrimaryTerm = {}, minPrimaryTermInRemote = {}",
421+
minimumReferencedPrimaryTerm,
422+
minPrimaryTermInRemote
423+
);
375424
}
376425
}
426+
427+
private static Long getMinPrimaryTermInRemote(
428+
AtomicLong minPrimaryTermInRemote,
429+
TranslogTransferManager translogTransferManager,
430+
Logger logger
431+
) {
432+
if (minPrimaryTermInRemote.get() == Long.MAX_VALUE) {
433+
CountDownLatch latch = new CountDownLatch(1);
434+
translogTransferManager.listPrimaryTermsInRemoteAsync(new LatchedActionListener<>(new ActionListener<>() {
435+
@Override
436+
public void onResponse(Set<Long> primaryTermsInRemote) {
437+
Optional<Long> minPrimaryTerm = primaryTermsInRemote.stream().min(Long::compareTo);
438+
minPrimaryTerm.ifPresent(minPrimaryTermInRemote::set);
439+
}
440+
441+
@Override
442+
public void onFailure(Exception e) {
443+
logger.error("Exception while fetching min primary term from remote translog", e);
444+
}
445+
}, latch));
446+
447+
try {
448+
if (latch.await(5, TimeUnit.MINUTES) == false) {
449+
logger.error("Timeout while fetching min primary term from remote translog");
450+
}
451+
} catch (InterruptedException e) {
452+
logger.error("Exception while fetching min primary term from remote translog", e);
453+
}
454+
}
455+
return minPrimaryTermInRemote.get();
456+
}
457+
458+
protected static Tuple<Long, Long> getMinMaxPrimaryTermFromMetadataFile(
459+
String metadataFile,
460+
TranslogTransferManager translogTransferManager,
461+
Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap
462+
) throws IOException {
463+
Tuple<Long, Long> minMaxPrimaryTermFromFileName = TranslogTransferMetadata.getMinMaxPrimaryTermFromFilename(metadataFile);
464+
if (minMaxPrimaryTermFromFileName != null) {
465+
return minMaxPrimaryTermFromFileName;
466+
} else {
467+
if (oldFormatMetadataFilePrimaryTermMap.containsKey(metadataFile)) {
468+
return oldFormatMetadataFilePrimaryTermMap.get(metadataFile);
469+
} else {
470+
TranslogTransferMetadata metadata = translogTransferManager.readMetadata(metadataFile);
471+
long maxPrimaryTem = TranslogTransferMetadata.getPrimaryTermFromFileName(metadataFile);
472+
long minPrimaryTem = -1;
473+
if (metadata.getGenerationToPrimaryTermMapper() != null
474+
&& metadata.getGenerationToPrimaryTermMapper().values().isEmpty() == false) {
475+
Optional<Long> primaryTerm = metadata.getGenerationToPrimaryTermMapper()
476+
.values()
477+
.stream()
478+
.map(s -> Long.parseLong(s))
479+
.min(Long::compareTo);
480+
if (primaryTerm.isPresent()) {
481+
minPrimaryTem = primaryTerm.get();
482+
}
483+
}
484+
Tuple<Long, Long> minMaxPrimaryTermTuple = new Tuple<>(minPrimaryTem, maxPrimaryTem);
485+
oldFormatMetadataFilePrimaryTermMap.put(metadataFile, minMaxPrimaryTermTuple);
486+
return minMaxPrimaryTermTuple;
487+
}
488+
}
489+
}
490+
491+
public static void cleanup(TranslogTransferManager translogTransferManager) throws IOException {
492+
ActionListener<List<BlobMetadata>> listMetadataFilesListener = new ActionListener<>() {
493+
@Override
494+
public void onResponse(List<BlobMetadata> blobMetadata) {
495+
List<String> metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList());
496+
497+
try {
498+
if (metadataFiles.isEmpty()) {
499+
staticLogger.debug("No stale translog metadata files found");
500+
return;
501+
}
502+
List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles, new HashMap<>(), staticLogger);
503+
if (metadataFilesToBeDeleted.isEmpty()) {
504+
staticLogger.debug("No metadata files to delete");
505+
return;
506+
}
507+
staticLogger.debug(() -> "metadataFilesToBeDeleted = " + metadataFilesToBeDeleted);
508+
509+
// For all the files that we are keeping, fetch min and max generations
510+
List<String> metadataFilesNotToBeDeleted = new ArrayList<>(metadataFiles);
511+
metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted);
512+
staticLogger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted);
513+
514+
// Delete stale metadata files
515+
translogTransferManager.deleteMetadataFilesAsync(
516+
metadataFilesToBeDeleted,
517+
// Delete stale primary terms
518+
() -> deleteStaleRemotePrimaryTerms(
519+
metadataFilesNotToBeDeleted,
520+
translogTransferManager,
521+
new HashMap<>(),
522+
new AtomicLong(Long.MAX_VALUE),
523+
staticLogger
524+
)
525+
);
526+
} catch (Exception e) {
527+
staticLogger.error("Exception while cleaning up metadata and primary terms", e);
528+
}
529+
}
530+
531+
@Override
532+
public void onFailure(Exception e) {
533+
staticLogger.error("Exception while cleaning up metadata and primary terms", e);
534+
}
535+
};
536+
translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener);
537+
}
377538
}

server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,23 @@ public void deleteGenerationAsync(long primaryTerm, Set<Long> generations, Runna
520520
*/
521521
public void deletePrimaryTermsAsync(long minPrimaryTermToKeep) {
522522
logger.info("Deleting primary terms from remote store lesser than {}", minPrimaryTermToKeep);
523+
listPrimaryTermsInRemoteAsync(new ActionListener<>() {
524+
@Override
525+
public void onResponse(Set<Long> primaryTermsInRemote) {
526+
Set<Long> primaryTermsToDelete = primaryTermsInRemote.stream()
527+
.filter(term -> term < minPrimaryTermToKeep)
528+
.collect(Collectors.toSet());
529+
primaryTermsToDelete.forEach(term -> deletePrimaryTermAsync(term));
530+
}
531+
532+
@Override
533+
public void onFailure(Exception e) {
534+
logger.error("Exception occurred while deleting primary terms from remote store", e);
535+
}
536+
});
537+
}
538+
539+
public void listPrimaryTermsInRemoteAsync(ActionListener<Set<Long>> listener) {
523540
transferService.listFoldersAsync(ThreadPool.Names.REMOTE_PURGE, remoteDataTransferPath, new ActionListener<>() {
524541
@Override
525542
public void onResponse(Set<String> folders) {
@@ -532,15 +549,13 @@ public void onResponse(Set<String> folders) {
532549
}
533550
return false;
534551
}).map(Long::parseLong).collect(Collectors.toSet());
535-
Set<Long> primaryTermsToDelete = primaryTermsInRemote.stream()
536-
.filter(term -> term < minPrimaryTermToKeep)
537-
.collect(Collectors.toSet());
538-
primaryTermsToDelete.forEach(term -> deletePrimaryTermAsync(term));
552+
listener.onResponse(primaryTermsInRemote);
539553
}
540554

541555
@Override
542556
public void onFailure(Exception e) {
543557
logger.error("Exception occurred while getting primary terms from remote store", e);
558+
listener.onFailure(e);
544559
}
545560
});
546561
}

server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Arrays;
2020
import java.util.Map;
2121
import java.util.Objects;
22+
import java.util.Optional;
2223

2324
/**
2425
* The metadata associated with every transfer {@link TransferSnapshot}. The metadata is uploaded at the end of the
@@ -108,11 +109,28 @@ public String getFileName() {
108109
RemoteStoreUtils.invertLong(createdAt),
109110
String.valueOf(Objects.hash(nodeId)),
110111
RemoteStoreUtils.invertLong(minTranslogGeneration),
112+
String.valueOf(getMinPrimaryTermReferred()),
111113
String.valueOf(CURRENT_VERSION)
112114
)
113115
);
114116
}
115117

118+
private long getMinPrimaryTermReferred() {
119+
if (generationToPrimaryTermMapper.get() == null || generationToPrimaryTermMapper.get().values().isEmpty()) {
120+
return -1;
121+
}
122+
Optional<Long> minPrimaryTerm = generationToPrimaryTermMapper.get()
123+
.values()
124+
.stream()
125+
.map(s -> Long.parseLong(s))
126+
.min(Long::compareTo);
127+
if (minPrimaryTerm.isPresent()) {
128+
return minPrimaryTerm.get();
129+
} else {
130+
return -1;
131+
}
132+
}
133+
116134
public static Tuple<Tuple<Long, Long>, String> getNodeIdByPrimaryTermAndGeneration(String filename) {
117135
String[] tokens = filename.split(METADATA_SEPARATOR);
118136
if (tokens.length < 6) {
@@ -143,15 +161,43 @@ public static Tuple<Long, Long> getMinMaxTranslogGenerationFromFilename(String f
143161
assert Version.CURRENT.onOrAfter(Version.V_2_17_0);
144162
try {
145163
// instead of direct index, we go backwards to avoid running into same separator in nodeId
146-
String minGeneration = tokens[tokens.length - 2];
164+
String minGeneration = tokens[tokens.length - 3];
147165
String maxGeneration = tokens[2];
148166
return new Tuple<>(RemoteStoreUtils.invertLong(minGeneration), RemoteStoreUtils.invertLong(maxGeneration));
149-
} catch (NumberFormatException e) {
167+
} catch (Exception e) {
150168
logger.error(() -> new ParameterizedMessage("Exception while getting min and max translog generation from: {}", filename), e);
151169
return null;
152170
}
153171
}
154172

173+
public static Tuple<Long, Long> getMinMaxPrimaryTermFromFilename(String filename) {
174+
String[] tokens = filename.split(METADATA_SEPARATOR);
175+
if (tokens.length < 7) {
176+
// For versions < 2.17, we don't have min primary term.
177+
return null;
178+
}
179+
assert Version.CURRENT.onOrAfter(Version.V_2_17_0);
180+
try {
181+
// instead of direct index, we go backwards to avoid running into same separator in nodeId
182+
String minPrimaryTerm = tokens[tokens.length - 2];
183+
String maxPrimaryTerm = tokens[1];
184+
return new Tuple<>(Long.parseLong(minPrimaryTerm), RemoteStoreUtils.invertLong(maxPrimaryTerm));
185+
} catch (Exception e) {
186+
logger.error(() -> new ParameterizedMessage("Exception while getting min and max primary term from: {}", filename), e);
187+
return null;
188+
}
189+
}
190+
191+
public static long getPrimaryTermFromFileName(String filename) {
192+
String[] tokens = filename.split(METADATA_SEPARATOR);
193+
try {
194+
return RemoteStoreUtils.invertLong(tokens[1]);
195+
} catch (Exception e) {
196+
logger.error(() -> new ParameterizedMessage("Exception while getting max primary term from: {}", filename), e);
197+
return -1;
198+
}
199+
}
200+
155201
@Override
156202
public int hashCode() {
157203
return Objects.hash(primaryTerm, generation);

0 commit comments

Comments
 (0)