1919
2020package org .elasticsearch .discovery ;
2121
22+ import org .apache .logging .log4j .Level ;
23+ import org .apache .logging .log4j .LogManager ;
24+ import org .apache .logging .log4j .Logger ;
2225import org .apache .lucene .util .SetOnce ;
26+ import org .elasticsearch .ElasticsearchException ;
2327import org .elasticsearch .Version ;
2428import org .elasticsearch .action .ActionListener ;
2529import org .elasticsearch .cluster .ClusterName ;
2630import org .elasticsearch .cluster .node .DiscoveryNode ;
31+ import org .elasticsearch .common .Nullable ;
32+ import org .elasticsearch .common .logging .Loggers ;
2733import org .elasticsearch .common .settings .Settings ;
34+ import org .elasticsearch .common .transport .TransportAddress ;
2835import org .elasticsearch .test .ESTestCase ;
36+ import org .elasticsearch .test .MockLogAppender ;
37+ import org .elasticsearch .test .junit .annotations .TestLogging ;
2938import org .elasticsearch .test .transport .MockTransport ;
3039import org .elasticsearch .threadpool .TestThreadPool ;
3140import org .elasticsearch .threadpool .ThreadPool ;
41+ import org .elasticsearch .transport .ConnectTransportException ;
42+ import org .elasticsearch .transport .TransportException ;
3243import org .elasticsearch .transport .TransportRequest ;
3344import org .elasticsearch .transport .TransportService ;
3445import org .elasticsearch .transport .TransportService .HandshakeResponse ;
4455import static org .elasticsearch .discovery .HandshakingTransportAddressConnector .PROBE_HANDSHAKE_TIMEOUT_SETTING ;
4556import static org .elasticsearch .node .Node .NODE_NAME_SETTING ;
4657import static org .hamcrest .Matchers .equalTo ;
58+ import static org .hamcrest .Matchers .notNullValue ;
59+ import static org .hamcrest .Matchers .oneOf ;
4760
4861public class HandshakingTransportAddressConnectorTests extends ESTestCase {
4962
5063 private DiscoveryNode remoteNode ;
64+ private TransportAddress discoveryAddress ;
5165 private TransportService transportService ;
5266 private ThreadPool threadPool ;
5367 private String remoteClusterName ;
5468 private HandshakingTransportAddressConnector handshakingTransportAddressConnector ;
5569 private DiscoveryNode localNode ;
5670
5771 private boolean dropHandshake ;
72+ @ Nullable // unless we want the full connection to fail
73+ private TransportException fullConnectionFailure ;
5874
5975 @ Before
6076 public void startServices () {
@@ -66,17 +82,24 @@ public void startServices() {
6682 threadPool = new TestThreadPool ("node" , settings );
6783
6884 remoteNode = null ;
85+ discoveryAddress = null ;
6986 remoteClusterName = null ;
7087 dropHandshake = false ;
88+ fullConnectionFailure = null ;
7189
7290 final MockTransport mockTransport = new MockTransport () {
7391 @ Override
7492 protected void onSendRequest (long requestId , String action , TransportRequest request , DiscoveryNode node ) {
7593 super .onSendRequest (requestId , action , request , node );
7694 assertThat (action , equalTo (TransportService .HANDSHAKE_ACTION_NAME ));
77- assertEquals (remoteNode .getAddress (), node .getAddress ());
95+ assertThat (discoveryAddress , notNullValue ());
96+ assertThat (node .getAddress (), oneOf (discoveryAddress , remoteNode .getAddress ()));
7897 if (dropHandshake == false ) {
79- handleResponse (requestId , new HandshakeResponse (remoteNode , new ClusterName (remoteClusterName ), Version .CURRENT ));
98+ if (fullConnectionFailure != null && node .getAddress ().equals (remoteNode .getAddress ())) {
99+ handleError (requestId , fullConnectionFailure );
100+ } else {
101+ handleResponse (requestId , new HandshakeResponse (remoteNode , new ClusterName (remoteClusterName ), Version .CURRENT ));
102+ }
80103 }
81104 }
82105 };
@@ -91,7 +114,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
91114 }
92115
93116 @ After
94- public void stopServices () throws InterruptedException {
117+ public void stopServices () {
95118 transportService .stop ();
96119 terminate (threadPool );
97120 }
@@ -102,8 +125,9 @@ public void testConnectsToMasterNode() throws InterruptedException {
102125
103126 remoteNode = new DiscoveryNode ("remote-node" , buildNewFakeTransportAddress (), Version .CURRENT );
104127 remoteClusterName = "local-cluster" ;
128+ discoveryAddress = getDiscoveryAddress ();
105129
106- handshakingTransportAddressConnector .connectToRemoteMasterNode (remoteNode . getAddress () , new ActionListener <DiscoveryNode >() {
130+ handshakingTransportAddressConnector .connectToRemoteMasterNode (discoveryAddress , new ActionListener <DiscoveryNode >() {
107131 @ Override
108132 public void onResponse (DiscoveryNode discoveryNode ) {
109133 receivedNode .set (discoveryNode );
@@ -120,44 +144,84 @@ public void onFailure(Exception e) {
120144 assertEquals (remoteNode , receivedNode .get ());
121145 }
122146
147+ @ TestLogging (reason ="ensure logging happens" , value ="org.elasticsearch.discovery.HandshakingTransportAddressConnector:INFO" )
148+ public void testLogsFullConnectionFailureAfterSuccessfulHandshake () throws Exception {
149+
150+ remoteNode = new DiscoveryNode ("remote-node" , buildNewFakeTransportAddress (), Version .CURRENT );
151+ remoteClusterName = "local-cluster" ;
152+ discoveryAddress = buildNewFakeTransportAddress ();
153+
154+ fullConnectionFailure = new ConnectTransportException (remoteNode , "simulated" , new ElasticsearchException ("root cause" ));
155+
156+ FailureListener failureListener = new FailureListener ();
157+
158+ MockLogAppender mockAppender = new MockLogAppender ();
159+ mockAppender .start ();
160+ mockAppender .addExpectation (
161+ new MockLogAppender .SeenEventExpectation (
162+ "message" ,
163+ HandshakingTransportAddressConnector .class .getCanonicalName (),
164+ Level .WARN ,
165+ "*completed handshake with [*] but followup connection failed*" ));
166+ Logger targetLogger = LogManager .getLogger (HandshakingTransportAddressConnector .class );
167+ Loggers .addAppender (targetLogger , mockAppender );
168+
169+ try {
170+ handshakingTransportAddressConnector .connectToRemoteMasterNode (discoveryAddress , failureListener );
171+ failureListener .assertFailure ();
172+ mockAppender .assertAllExpectationsMatched ();
173+ } finally {
174+ Loggers .removeAppender (targetLogger , mockAppender );
175+ mockAppender .stop ();
176+ }
177+ }
178+
123179 public void testDoesNotConnectToNonMasterNode () throws InterruptedException {
124180 remoteNode = new DiscoveryNode ("remote-node" , buildNewFakeTransportAddress (), emptyMap (), emptySet (), Version .CURRENT );
181+ discoveryAddress = getDiscoveryAddress ();
125182 remoteClusterName = "local-cluster" ;
126183
127184 FailureListener failureListener = new FailureListener ();
128- handshakingTransportAddressConnector .connectToRemoteMasterNode (remoteNode . getAddress () , failureListener );
185+ handshakingTransportAddressConnector .connectToRemoteMasterNode (discoveryAddress , failureListener );
129186 failureListener .assertFailure ();
130187 }
131188
132189 public void testDoesNotConnectToLocalNode () throws Exception {
133190 remoteNode = localNode ;
191+ discoveryAddress = getDiscoveryAddress ();
134192 remoteClusterName = "local-cluster" ;
135193
136194 FailureListener failureListener = new FailureListener ();
137- handshakingTransportAddressConnector .connectToRemoteMasterNode (remoteNode . getAddress () , failureListener );
195+ handshakingTransportAddressConnector .connectToRemoteMasterNode (discoveryAddress , failureListener );
138196 failureListener .assertFailure ();
139197 }
140198
141199 public void testDoesNotConnectToDifferentCluster () throws InterruptedException {
142200 remoteNode = new DiscoveryNode ("remote-node" , buildNewFakeTransportAddress (), Version .CURRENT );
201+ discoveryAddress = getDiscoveryAddress ();
143202 remoteClusterName = "another-cluster" ;
144203
145204 FailureListener failureListener = new FailureListener ();
146- handshakingTransportAddressConnector .connectToRemoteMasterNode (remoteNode . getAddress () , failureListener );
205+ handshakingTransportAddressConnector .connectToRemoteMasterNode (discoveryAddress , failureListener );
147206 failureListener .assertFailure ();
148207 }
149208
150209 public void testHandshakeTimesOut () throws InterruptedException {
151210 remoteNode = new DiscoveryNode ("remote-node" , buildNewFakeTransportAddress (), Version .CURRENT );
211+ discoveryAddress = getDiscoveryAddress ();
152212 remoteClusterName = "local-cluster" ;
153213 dropHandshake = true ;
154214
155215 FailureListener failureListener = new FailureListener ();
156- handshakingTransportAddressConnector .connectToRemoteMasterNode (remoteNode . getAddress () , failureListener );
216+ handshakingTransportAddressConnector .connectToRemoteMasterNode (discoveryAddress , failureListener );
157217 Thread .sleep (PROBE_HANDSHAKE_TIMEOUT_SETTING .get (Settings .EMPTY ).millis ());
158218 failureListener .assertFailure ();
159219 }
160220
221+ private TransportAddress getDiscoveryAddress () {
222+ return randomBoolean () ? remoteNode .getAddress () : buildNewFakeTransportAddress ();
223+ }
224+
161225 private class FailureListener implements ActionListener <DiscoveryNode > {
162226 final CountDownLatch completionLatch = new CountDownLatch (1 );
163227
0 commit comments