Skip to content

Commit b009c35

Browse files
committed
complete
1 parent 03d178b commit b009c35

File tree

1 file changed

+60
-62
lines changed

1 file changed

+60
-62
lines changed

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java

Lines changed: 60 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -308,8 +308,8 @@ protected void doRun() {
308308

309309
for (Index deletedIndex : event.indicesDeleted()) {
310310
final IndexMetadata indexMetadata = event.previousState().metadata().index(deletedIndex);
311-
assert indexMetadata != null
312-
|| state.metadata().indexGraveyard().containsIndex(deletedIndex) : "no previous metadata found for " + deletedIndex;
311+
assert indexMetadata != null || state.metadata().indexGraveyard().containsIndex(deletedIndex)
312+
: "no previous metadata found for " + deletedIndex;
313313
if (indexMetadata != null) {
314314
final Settings indexSetting = indexMetadata.getSettings();
315315
if (SearchableSnapshotsSettings.isSearchableSnapshotStore(indexSetting)) {
@@ -432,9 +432,7 @@ private class PeriodicMaintenanceTask implements Runnable, Releasable {
432432

433433
@Override
434434
public void run() {
435-
final PeriodicMaintenanceTask maintenanceTask = this;
436435
assert assertGenericThread();
437-
438436
try {
439437
ensureOpen();
440438
if (pointIntTimeId == null) {
@@ -444,16 +442,16 @@ public void run() {
444442
@Override
445443
public void onResponse(OpenPointInTimeResponse response) {
446444
logger.trace("periodic maintenance task initialized with point-in-time id [{}]", response.getPointInTimeId());
447-
maintenanceTask.pointIntTimeId = response.getPointInTimeId();
448-
executeNext(maintenanceTask);
445+
PeriodicMaintenanceTask.this.pointIntTimeId = response.getPointInTimeId();
446+
executeNext(PeriodicMaintenanceTask.this);
449447
}
450448

451449
@Override
452450
public void onFailure(Exception e) {
453451
if (TransportActions.isShardNotAvailableException(e)) {
454-
complete(maintenanceTask, null);
452+
complete(null);
455453
} else {
456-
complete(maintenanceTask, e);
454+
complete(e);
457455
}
458456
}
459457
});
@@ -485,16 +483,16 @@ public void onFailure(Exception e) {
485483
@Override
486484
public void onResponse(SearchResponse response) {
487485
if (searchAfter == null) {
488-
maintenanceTask.total.compareAndSet(0L, response.getHits().getTotalHits().value);
486+
PeriodicMaintenanceTask.this.total.compareAndSet(0L, response.getHits().getTotalHits().value);
489487
}
490-
maintenanceTask.searchResponse = response;
491-
maintenanceTask.searchAfter = null;
492-
executeNext(maintenanceTask);
488+
PeriodicMaintenanceTask.this.searchResponse = response;
489+
PeriodicMaintenanceTask.this.searchAfter = null;
490+
executeNext(PeriodicMaintenanceTask.this);
493491
}
494492

495493
@Override
496494
public void onFailure(Exception e) {
497-
complete(maintenanceTask, e);
495+
complete(e);
498496
}
499497
});
500498
return;
@@ -586,25 +584,25 @@ public void onResponse(BulkResponse response) {
586584
for (BulkItemResponse itemResponse : response.getItems()) {
587585
if (itemResponse.isFailed() == false) {
588586
assert itemResponse.getResponse() instanceof DeleteResponse;
589-
maintenanceTask.deletes.incrementAndGet();
587+
PeriodicMaintenanceTask.this.deletes.incrementAndGet();
590588
}
591589
}
592-
maintenanceTask.searchResponse = null;
593-
maintenanceTask.searchAfter = finalSearchAfter;
594-
executeNext(maintenanceTask);
590+
PeriodicMaintenanceTask.this.searchResponse = null;
591+
PeriodicMaintenanceTask.this.searchAfter = finalSearchAfter;
592+
executeNext(PeriodicMaintenanceTask.this);
595593
}
596594

597595
@Override
598596
public void onFailure(Exception e) {
599-
complete(maintenanceTask, e);
597+
complete(e);
600598
}
601599
});
602600
return;
603601
}
604602
// we're done, complete the task
605-
complete(this, null);
603+
complete(null);
606604
} catch (Exception e) {
607-
complete(this, e);
605+
complete(e);
608606
}
609607
}
610608

@@ -644,55 +642,55 @@ public void close() {
644642
}
645643
}
646644

647-
private boolean assertGenericThread() {
648-
final String threadName = Thread.currentThread().getName();
649-
assert threadName.contains(ThreadPool.Names.GENERIC) : threadName;
650-
return true;
645+
private void complete(@Nullable Exception failure) {
646+
assert isClosed() == false;
647+
final Releasable releasable = () -> {
648+
try {
649+
final Exception previous = error.getAndSet(failure);
650+
assert previous == null : "periodic maintenance task already failed: " + previous;
651+
close();
652+
} finally {
653+
startPeriodicTask();
654+
}
655+
};
656+
boolean waitForRelease = false;
657+
try {
658+
final String pitId = pointIntTimeId;
659+
if (Strings.hasLength(pitId)) {
660+
final ClosePointInTimeRequest closeRequest = new ClosePointInTimeRequest(pitId);
661+
clientWithOrigin.execute(ClosePointInTimeAction.INSTANCE, closeRequest, ActionListener.runAfter(new ActionListener<>() {
662+
@Override
663+
public void onResponse(ClosePointInTimeResponse response) {
664+
if (response.isSucceeded()) {
665+
logger.debug("periodic maintenance task successfully closed point-in-time id [{}]", pitId);
666+
} else {
667+
logger.debug("point-in-time id [{}] not found", pitId);
668+
}
669+
}
670+
671+
@Override
672+
public void onFailure(Exception e) {
673+
logger.warn(() -> new ParameterizedMessage("failed to close point-in-time id [{}]", pitId), e);
674+
}
675+
}, () -> Releasables.close(releasable)));
676+
waitForRelease = true;
677+
}
678+
} finally {
679+
if (waitForRelease == false) {
680+
Releasables.close(releasable);
681+
}
682+
}
651683
}
652684
}
653685

654686
private void executeNext(PeriodicMaintenanceTask maintenanceTask) {
655687
threadPool.generic().execute(maintenanceTask);
656688
}
657689

658-
private void complete(PeriodicMaintenanceTask maintenanceTask, @Nullable Exception failure) {
659-
assert maintenanceTask.isClosed() == false;
660-
final Releasable releasable = () -> {
661-
try {
662-
final Exception previous = maintenanceTask.error.getAndSet(failure);
663-
assert previous == null : "periodic maintenance task already failed: " + previous;
664-
maintenanceTask.close();
665-
} finally {
666-
startPeriodicTask();
667-
}
668-
};
669-
boolean waitForRelease = false;
670-
try {
671-
final String pitId = maintenanceTask.pointIntTimeId;
672-
if (Strings.hasLength(pitId)) {
673-
final ClosePointInTimeRequest closeRequest = new ClosePointInTimeRequest(pitId);
674-
clientWithOrigin.execute(ClosePointInTimeAction.INSTANCE, closeRequest, ActionListener.runAfter(new ActionListener<>() {
675-
@Override
676-
public void onResponse(ClosePointInTimeResponse response) {
677-
if (response.isSucceeded()) {
678-
logger.debug("periodic maintenance task successfully closed point-in-time id [{}]", pitId);
679-
} else {
680-
logger.debug("point-in-time id [{}] not found", pitId);
681-
}
682-
}
683-
684-
@Override
685-
public void onFailure(Exception e) {
686-
logger.warn(() -> new ParameterizedMessage("failed to close point-in-time id [{}]", pitId), e);
687-
}
688-
}, () -> Releasables.close(releasable)));
689-
waitForRelease = true;
690-
}
691-
} finally {
692-
if (waitForRelease == false) {
693-
Releasables.close(releasable);
694-
}
695-
}
690+
private static boolean assertGenericThread() {
691+
final String threadName = Thread.currentThread().getName();
692+
assert threadName.contains(ThreadPool.Names.GENERIC) : threadName;
693+
return true;
696694
}
697695

698696
private static Instant getCreationTime(SearchHit searchHit) {

0 commit comments

Comments
 (0)