@@ -239,12 +239,19 @@ void handlePublishResponse(PublishResponse publishResponse) {
239239 if (applyCommitRequest .isPresent ()) {
240240 sendApplyCommit ();
241241 } else {
242- Publication .this .handlePublishResponse (discoveryNode , publishResponse ).ifPresent (applyCommit -> {
243- assert applyCommitRequest .isPresent () == false ;
244- applyCommitRequest = Optional .of (applyCommit );
245- ackListener .onCommit (TimeValue .timeValueMillis (currentTimeSupplier .getAsLong () - startTime ));
246- publicationTargets .stream ().filter (PublicationTarget ::isWaitingForQuorum ).forEach (PublicationTarget ::sendApplyCommit );
247- });
242+ try {
243+ Publication .this .handlePublishResponse (discoveryNode , publishResponse ).ifPresent (applyCommit -> {
244+ assert applyCommitRequest .isPresent () == false ;
245+ applyCommitRequest = Optional .of (applyCommit );
246+ ackListener .onCommit (TimeValue .timeValueMillis (currentTimeSupplier .getAsLong () - startTime ));
247+ publicationTargets .stream ().filter (PublicationTarget ::isWaitingForQuorum ).forEach (PublicationTarget ::sendApplyCommit );
248+ });
249+ } catch (Exception e ) {
250+ setFailed (e );
251+ onPossibleCommitFailure ();
252+ } finally {
253+ assert publicationCompletedIffAllTargetsInactiveOrCancelled ();
254+ }
248255 }
249256 }
250257
@@ -333,15 +340,9 @@ public void onResponse(PublishWithJoinResponse response) {
333340
334341 assert state == PublicationTargetState .SENT_PUBLISH_REQUEST : state + " -> " + PublicationTargetState .WAITING_FOR_QUORUM ;
335342 state = PublicationTargetState .WAITING_FOR_QUORUM ;
343+ handlePublishResponse (response .getPublishResponse ());
336344
337- try {
338- handlePublishResponse (response .getPublishResponse ());
339- } catch (Exception e ) {
340- setFailed (e );
341- onPossibleCommitFailure ();
342- } finally {
343- assert publicationCompletedIffAllTargetsInactiveOrCancelled ();
344- }
345+ assert publicationCompletedIffAllTargetsInactiveOrCancelled ();
345346 }
346347
347348 @ Override
0 commit comments