Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1115,7 +1115,18 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
protected void doRun() {
try {
if (alreadyFailed.get() == false) {
snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store);
if (store.tryIncRef()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole loop is kinda awkward to begin with ... makes me wonder if we shouldn't just run this on the generic pool and makethe parallelism for snapshots configurable explicitly exactly like we do for recoveries ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makethe parallelism for snapshots configurable explicitly exactly like we do for recoveries

Not sure to follow you :(

Copy link
Contributor Author

@original-brownbear original-brownbear Oct 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry badly explained :)

I find this whole loop over all the files really strange. We currently create one Runnable for each file to upload individually then enqueue all the runnables. That forces us to do the strange alreadyFailed flag to not get crazy exceptions and also to increment and decrement the ref count on the store for each file individually.
It seems like it would be more correct/simpler and less hacky to simply have a queue of files and have workers pull from that queue until its empty. Then each worker can just get that reference once and we don't have to run all N tasks for N files even if the first file fails uploading.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining, it makes sense but I don't see this as a requirement to merge this PR. Let's keep this in our mind for the rainy boring days ;)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, this was more of a general comment to justify the weird code :)

try {
snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store);
} finally {
store.decRef();
}
} else if (snapshotStatus.isAborted()) {
throw new IndexShardSnapshotFailedException(shardId, "Aborted");
} else {
assert false : "Store was closed before aborting the snapshot";
throw new IllegalStateException("Store is closed already");
}
}
filesListener.onResponse(null);
} catch (IOException e) {
Expand Down Expand Up @@ -1316,7 +1327,6 @@ private void snapshotFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, IndexId
IndexShardSnapshotStatus snapshotStatus, Store store) throws IOException {
final BlobContainer shardContainer = shardContainer(indexId, shardId);
final String file = fileInfo.physicalName();
store.incRef();
try (IndexInput indexInput = store.openVerifyingInput(file, IOContext.READONCE, fileInfo.metadata())) {
for (int i = 0; i < fileInfo.numberOfParts(); i++) {
final long partBytes = fileInfo.partBytes(i);
Expand Down Expand Up @@ -1356,8 +1366,6 @@ private void checkAborted() {
failStoreIfCorrupted(store, t);
snapshotStatus.addProcessedFile(0);
throw t;
} finally {
store.decRef();
}
}

Expand Down