2323import org .apache .logging .log4j .message .ParameterizedMessage ;
2424import org .apache .lucene .store .AlreadyClosedException ;
2525import org .apache .lucene .util .SetOnce ;
26- import org .elasticsearch .Version ;
2726import org .elasticsearch .action .ActionListener ;
2827import org .elasticsearch .action .StepListener ;
2928import org .elasticsearch .action .admin .cluster .state .ClusterStateAction ;
5352import java .util .concurrent .ExecutorService ;
5453import java .util .concurrent .RejectedExecutionException ;
5554import java .util .concurrent .atomic .AtomicBoolean ;
56- import java .util .concurrent .atomic .AtomicLong ;
5755import java .util .function .Consumer ;
5856import java .util .function .Function ;
5957import java .util .function .Predicate ;
@@ -78,7 +76,7 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
7876 private static final Logger logger = LogManager .getLogger (RemoteClusterConnection .class );
7977
8078 private final TransportService transportService ;
81- private final ConnectionManager connectionManager ;
79+ private final RemoteConnectionManager remoteConnectionManager ;
8280 private final String clusterAlias ;
8381 private final int maxNumRemoteConnections ;
8482 private final Predicate <DiscoveryNode > nodePredicate ;
@@ -116,7 +114,7 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
116114 this .maxNumRemoteConnections = maxNumRemoteConnections ;
117115 this .nodePredicate = nodePredicate ;
118116 this .clusterAlias = clusterAlias ;
119- this .connectionManager = connectionManager ;
117+ this .remoteConnectionManager = new RemoteConnectionManager ( clusterAlias , connectionManager ) ;
120118 this .seedNodes = Collections .unmodifiableList (seedNodes );
121119 this .skipUnavailable = RemoteClusterService .REMOTE_CLUSTER_SKIP_UNAVAILABLE
122120 .getConcreteSettingForNamespace (clusterAlias ).get (settings );
@@ -168,8 +166,8 @@ boolean isSkipUnavailable() {
168166 }
169167
170168 @ Override
171- public void onNodeDisconnected (DiscoveryNode node ) {
172- if (connectionManager .size () < maxNumRemoteConnections ) {
169+ public void onNodeDisconnected (DiscoveryNode node , Transport . Connection connection ) {
170+ if (remoteConnectionManager .size () < maxNumRemoteConnections ) {
173171 // try to reconnect and fill up the slot of the disconnected node
174172 connectHandler .connect (ActionListener .wrap (
175173 ignore -> logger .trace ("successfully connected after disconnect of {}" , node ),
@@ -182,7 +180,7 @@ public void onNodeDisconnected(DiscoveryNode node) {
182180 * will invoke the listener immediately.
183181 */
184182 void ensureConnected (ActionListener <Void > voidActionListener ) {
185- if (connectionManager .size () == 0 ) {
183+ if (remoteConnectionManager .size () == 0 ) {
186184 connectHandler .connect (voidActionListener );
187185 } else {
188186 voidActionListener .onResponse (null );
@@ -211,8 +209,7 @@ void collectNodes(ActionListener<Function<String, DiscoveryNode>> listener) {
211209 request .clear ();
212210 request .nodes (true );
213211 request .local (true ); // run this on the node that gets the request it's as good as any other
214- final DiscoveryNode node = getAnyConnectedNode ();
215- Transport .Connection connection = connectionManager .getConnection (node );
212+ Transport .Connection connection = remoteConnectionManager .getAnyRemoteConnection ();
216213 transportService .sendRequest (connection , ClusterStateAction .NAME , request , TransportRequestOptions .EMPTY ,
217214 new TransportResponseHandler <ClusterStateResponse >() {
218215
@@ -256,12 +253,7 @@ public String executor() {
256253 * If such node is not connected, the returned connection will be a proxy connection that redirects to it.
257254 */
258255 Transport .Connection getConnection (DiscoveryNode remoteClusterNode ) {
259- if (connectionManager .nodeConnected (remoteClusterNode )) {
260- return connectionManager .getConnection (remoteClusterNode );
261- }
262- DiscoveryNode discoveryNode = getAnyConnectedNode ();
263- Transport .Connection connection = connectionManager .getConnection (discoveryNode );
264- return new ProxyConnection (connection , remoteClusterNode );
256+ return remoteConnectionManager .getRemoteConnection (remoteClusterNode );
265257 }
266258
267259 private Predicate <ClusterName > getRemoteClusterNamePredicate () {
@@ -280,67 +272,19 @@ public String toString() {
280272 };
281273 }
282274
283-
284- static final class ProxyConnection implements Transport .Connection {
285- private final Transport .Connection proxyConnection ;
286- private final DiscoveryNode targetNode ;
287-
288- private ProxyConnection (Transport .Connection proxyConnection , DiscoveryNode targetNode ) {
289- this .proxyConnection = proxyConnection ;
290- this .targetNode = targetNode ;
291- }
292-
293- @ Override
294- public DiscoveryNode getNode () {
295- return targetNode ;
296- }
297-
298- @ Override
299- public void sendRequest (long requestId , String action , TransportRequest request , TransportRequestOptions options )
300- throws IOException , TransportException {
301- proxyConnection .sendRequest (requestId , TransportActionProxy .getProxyAction (action ),
302- TransportActionProxy .wrapRequest (targetNode , request ), options );
303- }
304-
305- @ Override
306- public void close () {
307- assert false : "proxy connections must not be closed" ;
308- }
309-
310- @ Override
311- public void addCloseListener (ActionListener <Void > listener ) {
312- proxyConnection .addCloseListener (listener );
313- }
314-
315- @ Override
316- public boolean isClosed () {
317- return proxyConnection .isClosed ();
318- }
319-
320- @ Override
321- public Version getVersion () {
322- return proxyConnection .getVersion ();
323- }
324- }
325-
326275 Transport .Connection getConnection () {
327- return connectionManager . getConnection ( getAnyConnectedNode () );
276+ return remoteConnectionManager . getAnyRemoteConnection ( );
328277 }
329278
330279 @ Override
331280 public void close () throws IOException {
332- IOUtils .close (connectHandler );
333- connectionManager .closeNoBlock ();
281+ IOUtils .close (connectHandler , remoteConnectionManager );
334282 }
335283
336284 public boolean isClosed () {
337285 return connectHandler .isClosed ();
338286 }
339287
340- public String getProxyAddress () {
341- return proxyAddress ;
342- }
343-
344288 public List <Tuple <String , Supplier <DiscoveryNode >>> getSeedNodes () {
345289 return seedNodes ;
346290 }
@@ -456,14 +400,14 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes, Act
456400 final ConnectionProfile profile = ConnectionProfile .buildSingleChannelProfile (TransportRequestOptions .Type .REG );
457401 final StepListener <Transport .Connection > openConnectionStep = new StepListener <>();
458402 try {
459- connectionManager .openConnection (seedNode , profile , openConnectionStep );
403+ remoteConnectionManager .openConnection (seedNode , profile , openConnectionStep );
460404 } catch (Exception e ) {
461405 onFailure .accept (e );
462406 }
463407
464408 final StepListener <TransportService .HandshakeResponse > handShakeStep = new StepListener <>();
465409 openConnectionStep .whenComplete (connection -> {
466- ConnectionProfile connectionProfile = connectionManager .getConnectionProfile ();
410+ ConnectionProfile connectionProfile = remoteConnectionManager . getConnectionManager () .getConnectionProfile ();
467411 transportService .handshake (connection , connectionProfile .getHandshakeTimeout ().millis (),
468412 getRemoteClusterNamePredicate (), handShakeStep );
469413 }, onFailure );
@@ -472,8 +416,8 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes, Act
472416 handShakeStep .whenComplete (handshakeResponse -> {
473417 final DiscoveryNode handshakeNode = maybeAddProxyAddress (proxyAddress , handshakeResponse .getDiscoveryNode ());
474418
475- if (nodePredicate .test (handshakeNode ) && connectionManager .size () < maxNumRemoteConnections ) {
476- connectionManager .connectToNode (handshakeNode , null ,
419+ if (nodePredicate .test (handshakeNode ) && remoteConnectionManager .size () < maxNumRemoteConnections ) {
420+ remoteConnectionManager .connectToNode (handshakeNode , null ,
477421 transportService .connectionValidator (handshakeNode ), fullConnectionStep );
478422 } else {
479423 fullConnectionStep .onResponse (null );
@@ -565,8 +509,8 @@ public void handleResponse(ClusterStateResponse response) {
565509 private void handleNodes (Iterator <DiscoveryNode > nodesIter ) {
566510 while (nodesIter .hasNext ()) {
567511 final DiscoveryNode node = maybeAddProxyAddress (proxyAddress , nodesIter .next ());
568- if (nodePredicate .test (node ) && connectionManager .size () < maxNumRemoteConnections ) {
569- connectionManager .connectToNode (node , null ,
512+ if (nodePredicate .test (node ) && remoteConnectionManager .size () < maxNumRemoteConnections ) {
513+ remoteConnectionManager .connectToNode (node , null ,
570514 transportService .connectionValidator (node ), new ActionListener <>() {
571515 @ Override
572516 public void onResponse (Void aVoid ) {
@@ -625,20 +569,7 @@ boolean assertNoRunningConnections() { // for testing only
625569 }
626570
627571 boolean isNodeConnected (final DiscoveryNode node ) {
628- return connectionManager .nodeConnected (node );
629- }
630-
631- private final AtomicLong nextNodeId = new AtomicLong ();
632-
633- DiscoveryNode getAnyConnectedNode () {
634- List <DiscoveryNode > nodes = new ArrayList <>(connectionManager .connectedNodes ());
635- if (nodes .isEmpty ()) {
636- throw new NoSuchRemoteClusterException (clusterAlias );
637- } else {
638- long curr ;
639- while ((curr = nextNodeId .incrementAndGet ()) == Long .MIN_VALUE );
640- return nodes .get (Math .floorMod (curr , nodes .size ()));
641- }
572+ return remoteConnectionManager .getConnectionManager ().nodeConnected (node );
642573 }
643574
644575 /**
@@ -655,14 +586,14 @@ public RemoteConnectionInfo getConnectionInfo() {
655586 }
656587
657588 int getNumNodesConnected () {
658- return connectionManager .size ();
589+ return remoteConnectionManager .size ();
659590 }
660591
661592 private static ConnectionManager createConnectionManager (ConnectionProfile connectionProfile , TransportService transportService ) {
662593 return new ConnectionManager (connectionProfile , transportService .transport );
663594 }
664595
665596 ConnectionManager getConnectionManager () {
666- return connectionManager ;
597+ return remoteConnectionManager . getConnectionManager () ;
667598 }
668599}
0 commit comments