Skip to content

Commit 5b9864d

Browse files
Better Incrementality for Snapshots of Unchanged Shards (#52182) (#53984)
Use sequence numbers and force merge UUID to determine whether a shard has changed or not instead before falling back to comparing files to get incremental snapshots on primary fail-over.
1 parent 8b9d6e6 commit 5b9864d

File tree

15 files changed

+397
-101
lines changed

15 files changed

+397
-101
lines changed

server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshots.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ static final class Fields {
145145

146146
static final class ParseFields {
147147
static final ParseField FILES = new ParseField("files");
148+
static final ParseField SHARD_STATE_ID = new ParseField("shard_state_id");
148149
static final ParseField SNAPSHOTS = new ParseField("snapshots");
149150
}
150151

@@ -217,6 +218,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
217218
builder.value(fileInfo.name());
218219
}
219220
builder.endArray();
221+
if (snapshot.shardStateIdentifier() != null) {
222+
builder.field(ParseFields.SHARD_STATE_ID.getPreferredName(), snapshot.shardStateIdentifier());
223+
}
220224
builder.endObject();
221225
}
222226
builder.endObject();
@@ -229,6 +233,8 @@ public static BlobStoreIndexShardSnapshots fromXContent(XContentParser parser) t
229233
token = parser.nextToken();
230234
}
231235
Map<String, List<String>> snapshotsMap = new HashMap<>();
236+
Map<String, String> historyUUIDs = new HashMap<>();
237+
Map<String, Long> globalCheckpoints = new HashMap<>();
232238
Map<String, FileInfo> files = new HashMap<>();
233239
if (token == XContentParser.Token.START_OBJECT) {
234240
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
@@ -260,15 +266,16 @@ public static BlobStoreIndexShardSnapshots fromXContent(XContentParser parser) t
260266
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
261267
if (token == XContentParser.Token.FIELD_NAME) {
262268
currentFieldName = parser.currentName();
263-
if (parser.nextToken() == XContentParser.Token.START_ARRAY) {
264-
if (ParseFields.FILES.match(currentFieldName, parser.getDeprecationHandler()) == false) {
265-
throw new ElasticsearchParseException("unknown array [{}]", currentFieldName);
266-
}
269+
if (ParseFields.FILES.match(currentFieldName, parser.getDeprecationHandler()) &&
270+
parser.nextToken() == XContentParser.Token.START_ARRAY) {
267271
List<String> fileNames = new ArrayList<>();
268272
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
269273
fileNames.add(parser.text());
270274
}
271275
snapshotsMap.put(snapshot, fileNames);
276+
} else if (ParseFields.SHARD_STATE_ID.match(currentFieldName, parser.getDeprecationHandler())) {
277+
parser.nextToken();
278+
historyUUIDs.put(snapshot, parser.text());
272279
}
273280
}
274281
}
@@ -287,7 +294,8 @@ public static BlobStoreIndexShardSnapshots fromXContent(XContentParser parser) t
287294
assert fileInfo != null;
288295
fileInfosBuilder.add(fileInfo);
289296
}
290-
snapshots.add(new SnapshotFiles(entry.getKey(), Collections.unmodifiableList(fileInfosBuilder)));
297+
snapshots.add(new SnapshotFiles(entry.getKey(), Collections.unmodifiableList(fileInfosBuilder),
298+
historyUUIDs.get(entry.getKey())));
291299
}
292300
return new BlobStoreIndexShardSnapshots(files, Collections.unmodifiableList(snapshots));
293301
}

server/src/main/java/org/elasticsearch/index/snapshots/blobstore/SnapshotFiles.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.elasticsearch.index.snapshots.blobstore;
2020

21+
import org.elasticsearch.common.Nullable;
2122
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
2223

2324
import java.util.HashMap;
@@ -33,6 +34,9 @@ public class SnapshotFiles {
3334

3435
private final List<FileInfo> indexFiles;
3536

37+
@Nullable
38+
private final String shardStateIdentifier;
39+
3640
private Map<String, FileInfo> physicalFiles = null;
3741

3842
/**
@@ -45,12 +49,23 @@ public String snapshot() {
4549
}
4650

4751
/**
48-
* @param snapshot snapshot name
49-
* @param indexFiles index files
52+
* @param snapshot snapshot name
53+
* @param indexFiles index files
54+
* @param shardStateIdentifier unique identifier for the state of the shard that this snapshot was taken from
5055
*/
51-
public SnapshotFiles(String snapshot, List<FileInfo> indexFiles ) {
56+
public SnapshotFiles(String snapshot, List<FileInfo> indexFiles, @Nullable String shardStateIdentifier) {
5257
this.snapshot = snapshot;
5358
this.indexFiles = indexFiles;
59+
this.shardStateIdentifier = shardStateIdentifier;
60+
}
61+
62+
/**
63+
* Returns an identifier for the shard state that can be used to check whether a shard has changed between
64+
* snapshots or not.
65+
*/
66+
@Nullable
67+
public String shardStateIdentifier() {
68+
return shardStateIdentifier;
5469
}
5570

5671
/**

server/src/main/java/org/elasticsearch/repositories/FilterRepository.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,10 @@ public boolean isReadOnly() {
126126

127127
@Override
128128
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
129-
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion,
130-
Map<String, Object> userMetadata, ActionListener<String> listener) {
131-
in.snapshotShard(
132-
store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, repositoryMetaVersion, userMetadata, listener);
129+
IndexCommit snapshotIndexCommit, String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus,
130+
Version repositoryMetaVersion, Map<String, Object> userMetadata, ActionListener<String> listener) {
131+
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, shardStateIdentifier, snapshotStatus,
132+
repositoryMetaVersion, userMetadata, listener);
133133
}
134134
@Override
135135
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState,

server/src/main/java/org/elasticsearch/repositories/Repository.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.cluster.metadata.MetaData;
2828
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
2929
import org.elasticsearch.cluster.node.DiscoveryNode;
30+
import org.elasticsearch.common.Nullable;
3031
import org.elasticsearch.common.component.LifecycleComponent;
3132
import org.elasticsearch.index.mapper.MapperService;
3233
import org.elasticsearch.index.shard.ShardId;
@@ -212,14 +213,17 @@ void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations,
212213
* @param snapshotId snapshot id
213214
* @param indexId id for the index being snapshotted
214215
* @param snapshotIndexCommit commit point
216+
* @param shardStateIdentifier a unique identifier of the state of the shard that is stored with the shard's snapshot and used
217+
* to detect if the shard has changed between snapshots. If {@code null} is passed as the identifier
218+
* snapshotting will be done by inspecting the physical files referenced by {@code snapshotIndexCommit}
215219
* @param snapshotStatus snapshot status
216220
* @param repositoryMetaVersion version of the updated repository metadata to write
217221
* @param userMetadata user metadata of the snapshot found in {@link SnapshotsInProgress.Entry#userMetadata()}
218222
* @param listener listener invoked on completion
219223
*/
220224
void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
221-
IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion, Map<String, Object> userMetadata,
222-
ActionListener<String> listener);
225+
@Nullable String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion,
226+
Map<String, Object> userMetadata, ActionListener<String> listener);
223227

224228
/**
225229
* Restores snapshot of the shard.

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 77 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@
126126
import java.util.Collections;
127127
import java.util.List;
128128
import java.util.Map;
129+
import java.util.Optional;
129130
import java.util.Set;
130131
import java.util.concurrent.BlockingQueue;
131132
import java.util.concurrent.ConcurrentHashMap;
@@ -1552,8 +1553,8 @@ private void writeAtomic(final String blobName, final BytesReference bytesRef, b
15521553

15531554
@Override
15541555
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
1555-
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion,
1556-
Map<String, Object> userMetadata, ActionListener<String> listener) {
1556+
IndexCommit snapshotIndexCommit, String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus,
1557+
Version repositoryMetaVersion, Map<String, Object> userMetadata, ActionListener<String> listener) {
15571558
final ShardId shardId = store.shardId();
15581559
final long startTime = threadPool.absoluteTimeInMillis();
15591560
try {
@@ -1579,76 +1580,92 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
15791580
throw new IndexShardSnapshotFailedException(shardId,
15801581
"Duplicate snapshot name [" + snapshotId.getName() + "] detected, aborting");
15811582
}
1582-
1583-
final List<BlobStoreIndexShardSnapshot.FileInfo> indexCommitPointFiles = new ArrayList<>();
1584-
final BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> filesToSnapshot = new LinkedBlockingQueue<>();
1585-
store.incRef();
1586-
final Collection<String> fileNames;
1587-
final Store.MetadataSnapshot metadataFromStore;
1588-
try {
1589-
// TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should
1590-
try {
1591-
logger.trace(
1592-
"[{}] [{}] Loading store metadata using index commit [{}]", shardId, snapshotId, snapshotIndexCommit);
1593-
metadataFromStore = store.getMetadata(snapshotIndexCommit);
1594-
fileNames = snapshotIndexCommit.getFileNames();
1595-
} catch (IOException e) {
1596-
throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e);
1583+
// First inspect all known SegmentInfos instances to see if we already have an equivalent commit in the repository
1584+
final List<BlobStoreIndexShardSnapshot.FileInfo> filesFromSegmentInfos = Optional.ofNullable(shardStateIdentifier).map(id -> {
1585+
for (SnapshotFiles snapshotFileSet : snapshots.snapshots()) {
1586+
if (id.equals(snapshotFileSet.shardStateIdentifier())) {
1587+
return snapshotFileSet.indexFiles();
1588+
}
15971589
}
1598-
} finally {
1599-
store.decRef();
1600-
}
1590+
return null;
1591+
}).orElse(null);
1592+
1593+
final List<BlobStoreIndexShardSnapshot.FileInfo> indexCommitPointFiles;
16011594
int indexIncrementalFileCount = 0;
16021595
int indexTotalNumberOfFiles = 0;
16031596
long indexIncrementalSize = 0;
1604-
long indexTotalFileCount = 0;
1605-
for (String fileName : fileNames) {
1606-
if (snapshotStatus.isAborted()) {
1607-
logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName);
1608-
throw new IndexShardSnapshotFailedException(shardId, "Aborted");
1597+
long indexTotalFileSize = 0;
1598+
final BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> filesToSnapshot = new LinkedBlockingQueue<>();
1599+
// If we did not find a set of files that is equal to the current commit we determine the files to upload by comparing files
1600+
// in the commit with files already in the repository
1601+
if (filesFromSegmentInfos == null) {
1602+
indexCommitPointFiles = new ArrayList<>();
1603+
store.incRef();
1604+
final Collection<String> fileNames;
1605+
final Store.MetadataSnapshot metadataFromStore;
1606+
try {
1607+
// TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should
1608+
try {
1609+
logger.trace(
1610+
"[{}] [{}] Loading store metadata using index commit [{}]", shardId, snapshotId, snapshotIndexCommit);
1611+
metadataFromStore = store.getMetadata(snapshotIndexCommit);
1612+
fileNames = snapshotIndexCommit.getFileNames();
1613+
} catch (IOException e) {
1614+
throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e);
1615+
}
1616+
} finally {
1617+
store.decRef();
16091618
}
1619+
for (String fileName : fileNames) {
1620+
if (snapshotStatus.isAborted()) {
1621+
logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName);
1622+
throw new IndexShardSnapshotFailedException(shardId, "Aborted");
1623+
}
16101624

1611-
logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName);
1612-
final StoreFileMetaData md = metadataFromStore.get(fileName);
1613-
BlobStoreIndexShardSnapshot.FileInfo existingFileInfo = null;
1614-
List<BlobStoreIndexShardSnapshot.FileInfo> filesInfo = snapshots.findPhysicalIndexFiles(fileName);
1615-
if (filesInfo != null) {
1616-
for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : filesInfo) {
1617-
if (fileInfo.isSame(md)) {
1618-
// a commit point file with the same name, size and checksum was already copied to repository
1619-
// we will reuse it for this snapshot
1620-
existingFileInfo = fileInfo;
1621-
break;
1625+
logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName);
1626+
final StoreFileMetaData md = metadataFromStore.get(fileName);
1627+
BlobStoreIndexShardSnapshot.FileInfo existingFileInfo = null;
1628+
List<BlobStoreIndexShardSnapshot.FileInfo> filesInfo = snapshots.findPhysicalIndexFiles(fileName);
1629+
if (filesInfo != null) {
1630+
for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : filesInfo) {
1631+
if (fileInfo.isSame(md)) {
1632+
// a commit point file with the same name, size and checksum was already copied to repository
1633+
// we will reuse it for this snapshot
1634+
existingFileInfo = fileInfo;
1635+
break;
1636+
}
16221637
}
16231638
}
1624-
}
16251639

1626-
// We can skip writing blobs where the metadata hash is equal to the blob's contents because we store the hash/contents
1627-
// directly in the shard level metadata in this case
1628-
final boolean needsWrite = md.hashEqualsContents() == false;
1629-
indexTotalFileCount += md.length();
1630-
indexTotalNumberOfFiles++;
1631-
1632-
if (existingFileInfo == null) {
1633-
indexIncrementalFileCount++;
1634-
indexIncrementalSize += md.length();
1635-
// create a new FileInfo
1636-
BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo =
1637-
new BlobStoreIndexShardSnapshot.FileInfo(
1638-
(needsWrite ? UPLOADED_DATA_BLOB_PREFIX : VIRTUAL_DATA_BLOB_PREFIX) + UUIDs.randomBase64UUID(),
1639-
md, chunkSize());
1640-
indexCommitPointFiles.add(snapshotFileInfo);
1641-
if (needsWrite) {
1642-
filesToSnapshot.add(snapshotFileInfo);
1640+
// We can skip writing blobs where the metadata hash is equal to the blob's contents because we store the hash/contents
1641+
// directly in the shard level metadata in this case
1642+
final boolean needsWrite = md.hashEqualsContents() == false;
1643+
indexTotalFileSize += md.length();
1644+
indexTotalNumberOfFiles++;
1645+
1646+
if (existingFileInfo == null) {
1647+
indexIncrementalFileCount++;
1648+
indexIncrementalSize += md.length();
1649+
// create a new FileInfo
1650+
BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo =
1651+
new BlobStoreIndexShardSnapshot.FileInfo(
1652+
(needsWrite ? UPLOADED_DATA_BLOB_PREFIX : VIRTUAL_DATA_BLOB_PREFIX) + UUIDs.randomBase64UUID(),
1653+
md, chunkSize());
1654+
indexCommitPointFiles.add(snapshotFileInfo);
1655+
if (needsWrite) {
1656+
filesToSnapshot.add(snapshotFileInfo);
1657+
}
1658+
assert needsWrite || assertFileContentsMatchHash(snapshotFileInfo, store);
1659+
} else {
1660+
indexCommitPointFiles.add(existingFileInfo);
16431661
}
1644-
assert needsWrite || assertFileContentsMatchHash(snapshotFileInfo, store);
1645-
} else {
1646-
indexCommitPointFiles.add(existingFileInfo);
16471662
}
1663+
} else {
1664+
indexCommitPointFiles = filesFromSegmentInfos;
16481665
}
16491666

16501667
snapshotStatus.moveToStarted(startTime, indexIncrementalFileCount,
1651-
indexTotalNumberOfFiles, indexIncrementalSize, indexTotalFileCount);
1668+
indexTotalNumberOfFiles, indexIncrementalSize, indexTotalFileSize);
16521669

16531670
final StepListener<Collection<Void>> allFilesUploadedListener = new StepListener<>();
16541671
allFilesUploadedListener.whenComplete(v -> {
@@ -1673,7 +1690,7 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
16731690
}
16741691
// build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones
16751692
List<SnapshotFiles> newSnapshotsList = new ArrayList<>();
1676-
newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()));
1693+
newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles(), shardStateIdentifier));
16771694
for (SnapshotFiles point : snapshots) {
16781695
newSnapshotsList.add(point);
16791696
}
@@ -1760,7 +1777,7 @@ public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, Sh
17601777
final BlobContainer container = shardContainer(indexId, snapshotShardId);
17611778
executor.execute(ActionRunnable.wrap(restoreListener, l -> {
17621779
final BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId);
1763-
final SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
1780+
final SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles(), null);
17641781
new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState) {
17651782
@Override
17661783
protected void restoreFiles(List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover, Store store,

0 commit comments

Comments
 (0)