4242import java .util .Optional ;
4343import java .util .Set ;
4444import java .util .concurrent .TimeUnit ;
45+ import java .util .concurrent .atomic .AtomicBoolean ;
4546import java .util .concurrent .atomic .AtomicInteger ;
4647import java .util .function .Function ;
4748import java .util .function .LongSupplier ;
4849import java .util .stream .Collector ;
4950import java .util .stream .Collectors ;
5051import java .util .stream .Stream ;
5152
53+ import static org .hamcrest .Matchers .containsInAnyOrder ;
5254import static org .hamcrest .Matchers .equalTo ;
5355
5456public class PublicationTests extends ESTestCase {
@@ -97,6 +99,7 @@ abstract class MockPublication extends Publication {
9799
98100 Map <DiscoveryNode , ActionListener <PublishWithJoinResponse >> pendingPublications = new HashMap <>();
99101 Map <DiscoveryNode , ActionListener <TransportResponse .Empty >> pendingCommits = new HashMap <>();
102+ Map <DiscoveryNode , PublishWithJoinResponse > possibleJoins = new HashMap <>();
100103
101104 MockPublication (Settings settings , PublishRequest publishRequest , Discovery .AckListener ackListener ,
102105 LongSupplier currentTimeSupplier ) {
@@ -106,13 +109,14 @@ abstract class MockPublication extends Publication {
106109
107110 @ Override
108111 protected void onCompletion (boolean committed ) {
112+ assertFalse (completed );
109113 completed = true ;
110114 this .committed = committed ;
111115 }
112116
113117 @ Override
114118 protected void onPossibleJoin (DiscoveryNode sourceNode , PublishWithJoinResponse response ) {
115-
119+ assertNull ( possibleJoins . put ( sourceNode , response ));
116120 }
117121
118122 @ Override
@@ -166,25 +170,39 @@ public void testSimpleClusterStatePublishing() throws InterruptedException {
166170
167171 assertThat (publication .pendingPublications .keySet (), equalTo (discoNodes ));
168172 assertTrue (publication .pendingCommits .isEmpty ());
173+ AtomicBoolean processedNode1PublishResponse = new AtomicBoolean ();
169174 publication .pendingPublications .entrySet ().stream ().collect (shuffle ()).forEach (e -> {
170175 PublishResponse publishResponse = nodeResolver .apply (e .getKey ()).coordinationState .handlePublishRequest (
171176 publication .publishRequest );
172- e .getValue ().onResponse (new PublishWithJoinResponse (publishResponse , Optional .empty ()));
177+ assertNotEquals (processedNode1PublishResponse .get (), publication .pendingCommits .isEmpty ());
178+ assertFalse (publication .possibleJoins .containsKey (e .getKey ()));
179+ PublishWithJoinResponse publishWithJoinResponse = new PublishWithJoinResponse (publishResponse ,
180+ randomBoolean () ? Optional .empty () : Optional .of (new Join (e .getKey (), n1 , randomNonNegativeLong (),
181+ randomNonNegativeLong (), randomNonNegativeLong ())));
182+ e .getValue ().onResponse (publishWithJoinResponse );
183+ assertTrue (publication .possibleJoins .containsKey (e .getKey ()));
184+ assertEquals (publishWithJoinResponse , publication .possibleJoins .get (e .getKey ()));
185+ if (e .getKey ().equals (n1 )) {
186+ processedNode1PublishResponse .set (true );
187+ }
188+ assertNotEquals (processedNode1PublishResponse .get (), publication .pendingCommits .isEmpty ());
173189 });
174190
175191 assertThat (publication .pendingCommits .keySet (), equalTo (discoNodes ));
176192 assertNotNull (publication .applyCommit );
177193 assertEquals (publication .applyCommit .getTerm (), publication .publishRequest .getAcceptedState ().term ());
178194 assertEquals (publication .applyCommit .getVersion (), publication .publishRequest .getAcceptedState ().version ());
179195 publication .pendingCommits .entrySet ().stream ().collect (shuffle ()).forEach (e -> {
196+ assertFalse (publication .completed );
197+ assertFalse (publication .committed );
180198 nodeResolver .apply (e .getKey ()).coordinationState .handleCommit (publication .applyCommit );
181199 e .getValue ().onResponse (TransportResponse .Empty .INSTANCE );
182200 });
183201
184202 assertTrue (publication .completed );
185203 assertTrue (publication .committed );
186204
187- assertThat (ackListener .await (0L , TimeUnit .SECONDS ). size (), equalTo ( 3 ));
205+ assertThat (ackListener .await (0L , TimeUnit .SECONDS ), containsInAnyOrder ( n1 , n2 , n3 ));
188206 }
189207
190208 public void testClusterStatePublishingWithFaultyNode () throws InterruptedException {
0 commit comments