Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,19 @@ public RepositoryData addSnapshot(final SnapshotId snapshotId,
return new RepositoryData(genId, snapshots, newSnapshotStates, allIndexSnapshots);
}

/**
* Create a new instance with the given generation and all other fields equal to this instance.
*
* @param newGeneration New Generation
* @return New instance
*/
public RepositoryData withGenId(long newGeneration) {
if (newGeneration == genId) {
return this;
}
return new RepositoryData(newGeneration, this.snapshotIds, this.snapshotStates, this.indexSnapshots);
}

/**
* Remove a snapshot and remove any indices that no longer exist in the repository due to the deletion of the snapshot.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,10 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Action
// Delete snapshot from the index file, since it is the maintainer of truth of active snapshots
final RepositoryData updatedRepositoryData;
final Map<String, BlobContainer> foundIndices;
final Set<String> rootBlobs;
try {
final RepositoryData repositoryData = getRepositoryData();
rootBlobs = blobContainer().listBlobs().keySet();
final RepositoryData repositoryData = getRepositoryData(latestGeneration(rootBlobs));
updatedRepositoryData = repositoryData.removeSnapshot(snapshotId);
// Cache the indices that were found before writing out the new index-N blob so that a stuck master will never
// delete an index that was created by another master node after writing this index-N blob.
Expand Down Expand Up @@ -666,7 +668,20 @@ public void endVerification(String seed) {
@Override
public RepositoryData getRepositoryData() {
try {
final long indexGen = latestIndexBlobId();
return getRepositoryData(latestIndexBlobId());
} catch (NoSuchFileException ex) {
// repository doesn't have an index blob, its a new blank repo
return RepositoryData.EMPTY;
} catch (IOException ioe) {
throw new RepositoryException(metadata.name(), "Could not determine repository generation from root blobs", ioe);
}
}

private RepositoryData getRepositoryData(long indexGen) {
if (indexGen == RepositoryData.EMPTY_REPO_GEN) {
return RepositoryData.EMPTY;
}
try {
final String snapshotsIndexBlobName = INDEX_FILE_PREFIX + Long.toString(indexGen);

RepositoryData repositoryData;
Expand Down Expand Up @@ -694,14 +709,14 @@ public boolean isReadOnly() {
return readOnly;
}

protected void writeIndexGen(final RepositoryData repositoryData, final long repositoryStateId) throws IOException {
protected void writeIndexGen(final RepositoryData repositoryData, final long expectedGen) throws IOException {
assert isReadOnly() == false; // can not write to a read only repository
final long currentGen = latestIndexBlobId();
if (currentGen != repositoryStateId) {
final long currentGen = repositoryData.getGenId();
if (currentGen != expectedGen) {
// the index file was updated by a concurrent operation, so we were operating on stale
// repository data
throw new RepositoryException(metadata.name(), "concurrent modification of the index-N file, expected current generation [" +
repositoryStateId + "], actual current generation [" + currentGen +
expectedGen + "], actual current generation [" + currentGen +
"] - possibly due to simultaneous snapshot deletion requests");
}
final long newGen = currentGen + 1;
Expand Down Expand Up @@ -767,14 +782,15 @@ long readSnapshotIndexLatestBlob() throws IOException {
}

private long listBlobsToGetLatestIndexId() throws IOException {
Map<String, BlobMetaData> blobs = blobContainer().listBlobsByPrefix(INDEX_FILE_PREFIX);
return latestGeneration(blobContainer().listBlobsByPrefix(INDEX_FILE_PREFIX).keySet());
}

private long latestGeneration(Collection<String> rootBlobs) {
long latest = RepositoryData.EMPTY_REPO_GEN;
if (blobs.isEmpty()) {
// no snapshot index blobs have been written yet
return latest;
}
for (final BlobMetaData blobMetaData : blobs.values()) {
final String blobName = blobMetaData.name();
for (String blobName : rootBlobs) {
if (blobName.startsWith(INDEX_FILE_PREFIX) == false) {
continue;
}
try {
final long curr = Long.parseLong(blobName.substring(INDEX_FILE_PREFIX.length()));
latest = Math.max(latest, curr);
Expand Down Expand Up @@ -809,9 +825,9 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
throw new IndexShardSnapshotFailedException(shardId, "failed to list blobs", e);
}

Tuple<BlobStoreIndexShardSnapshots, Integer> tuple = buildBlobStoreIndexShardSnapshots(blobs, shardContainer);
Tuple<BlobStoreIndexShardSnapshots, Long> tuple = buildBlobStoreIndexShardSnapshots(blobs, shardContainer);
BlobStoreIndexShardSnapshots snapshots = tuple.v1();
int fileListGeneration = tuple.v2();
long fileListGeneration = tuple.v2();

if (snapshots.snapshots().stream().anyMatch(sf -> sf.snapshot().equals(snapshotId.getName()))) {
throw new IndexShardSnapshotFailedException(shardId,
Expand Down Expand Up @@ -1013,9 +1029,9 @@ private void deleteShardSnapshot(IndexId indexId, ShardId snapshotShardId, Snaps
throw new IndexShardSnapshotException(snapshotShardId, "Failed to list content of shard directory", e);
}

Tuple<BlobStoreIndexShardSnapshots, Integer> tuple = buildBlobStoreIndexShardSnapshots(blobs, shardContainer);
Tuple<BlobStoreIndexShardSnapshots, Long> tuple = buildBlobStoreIndexShardSnapshots(blobs, shardContainer);
BlobStoreIndexShardSnapshots snapshots = tuple.v1();
int fileListGeneration = tuple.v2();
long fileListGeneration = tuple.v2();

try {
indexShardSnapshotFormat.delete(shardContainer, snapshotId.getUUID());
Expand Down Expand Up @@ -1058,9 +1074,9 @@ private BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContain
* @param blobs list of blobs in the container
* @param reason a reason explaining why the shard index file is written
*/
private void finalizeShard(List<SnapshotFiles> snapshots, int fileListGeneration, Map<String, BlobMetaData> blobs,
private void finalizeShard(List<SnapshotFiles> snapshots, long fileListGeneration, Map<String, BlobMetaData> blobs,
String reason, BlobContainer shardContainer, ShardId shardId, SnapshotId snapshotId) {
final String indexGeneration = Integer.toString(fileListGeneration + 1);
final String indexGeneration = Long.toString(fileListGeneration + 1);
try {
final List<String> blobsToDelete;
if (snapshots.isEmpty()) {
Expand Down Expand Up @@ -1094,26 +1110,14 @@ private void finalizeShard(List<SnapshotFiles> snapshots, int fileListGeneration
* @param blobs list of blobs in repository
* @return tuple of BlobStoreIndexShardSnapshots and the last snapshot index generation
*/
private Tuple<BlobStoreIndexShardSnapshots, Integer> buildBlobStoreIndexShardSnapshots(Map<String, BlobMetaData> blobs,
private Tuple<BlobStoreIndexShardSnapshots, Long> buildBlobStoreIndexShardSnapshots(Map<String, BlobMetaData> blobs,
BlobContainer shardContainer) {
int latest = -1;
Set<String> blobKeys = blobs.keySet();
for (String name : blobKeys) {
if (name.startsWith(SNAPSHOT_INDEX_PREFIX)) {
try {
int gen = Integer.parseInt(name.substring(SNAPSHOT_INDEX_PREFIX.length()));
if (gen > latest) {
latest = gen;
}
} catch (NumberFormatException ex) {
logger.warn("failed to parse index file name [{}]", name);
}
}
}
long latest = latestGeneration(blobKeys);
if (latest >= 0) {
try {
final BlobStoreIndexShardSnapshots shardSnapshots =
indexShardSnapshotsFormat.read(shardContainer, Integer.toString(latest));
indexShardSnapshotsFormat.read(shardContainer, Long.toString(latest));
return new Tuple<>(shardSnapshots, latest);
} catch (IOException e) {
final String file = SNAPSHOT_INDEX_PREFIX + latest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,13 @@ public void testRepositoryDataConcurrentModificationNotAllowed() throws IOExcept

// write to index generational file
RepositoryData repositoryData = generateRandomRepoData();
repository.writeIndexGen(repositoryData, repositoryData.getGenId());
final long startingGeneration = repositoryData.getGenId();
repository.writeIndexGen(repositoryData, startingGeneration);

// write repo data again to index generational file, errors because we already wrote to the
// N+1 generation from which this repository data instance was created
expectThrows(RepositoryException.class, () -> repository.writeIndexGen(repositoryData, repositoryData.getGenId()));
expectThrows(RepositoryException.class, () -> repository.writeIndexGen(
repositoryData.withGenId(startingGeneration + 1), repositoryData.getGenId()));
}

public void testBadChunksize() throws Exception {
Expand Down