|
17 | 17 | * under the License. |
18 | 18 | */ |
19 | 19 |
|
20 | | -package org.elasticsearch.discovery.zen; |
| 20 | +package org.elasticsearch.cluster.coordination; |
21 | 21 |
|
22 | | -import org.elasticsearch.ExceptionsHelper; |
23 | 22 | import org.elasticsearch.Version; |
24 | 23 | import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; |
25 | 24 | import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; |
26 | 25 | import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; |
27 | 26 | import org.elasticsearch.cluster.ClusterState; |
28 | 27 | import org.elasticsearch.cluster.metadata.MetaData; |
29 | 28 | import org.elasticsearch.cluster.node.DiscoveryNode; |
30 | | -import org.elasticsearch.cluster.node.DiscoveryNodes; |
31 | 29 | import org.elasticsearch.cluster.service.ClusterService; |
32 | 30 | import org.elasticsearch.common.Priority; |
33 | 31 | import org.elasticsearch.common.Strings; |
34 | | -import org.elasticsearch.common.bytes.BytesReference; |
35 | 32 | import org.elasticsearch.common.settings.Settings; |
36 | 33 | import org.elasticsearch.common.xcontent.ToXContent; |
37 | 34 | import org.elasticsearch.common.xcontent.XContentBuilder; |
38 | 35 | import org.elasticsearch.common.xcontent.XContentFactory; |
39 | 36 | import org.elasticsearch.discovery.Discovery; |
40 | 37 | import org.elasticsearch.discovery.DiscoveryStats; |
| 38 | +import org.elasticsearch.discovery.zen.FaultDetection; |
41 | 39 | import org.elasticsearch.node.Node; |
42 | 40 | import org.elasticsearch.test.ESIntegTestCase; |
43 | 41 | import org.elasticsearch.test.TestCustomMetaData; |
44 | | -import org.elasticsearch.test.discovery.TestZenDiscovery; |
45 | 42 | import org.elasticsearch.test.junit.annotations.TestLogging; |
46 | | -import org.elasticsearch.threadpool.ThreadPool; |
47 | | -import org.elasticsearch.transport.BytesTransportRequest; |
48 | | -import org.elasticsearch.transport.EmptyTransportResponseHandler; |
49 | | -import org.elasticsearch.transport.TransportException; |
50 | | -import org.elasticsearch.transport.TransportResponse; |
51 | | -import org.elasticsearch.transport.TransportService; |
| 43 | +import org.elasticsearch.transport.RemoteTransportException; |
52 | 44 |
|
53 | 45 | import java.io.IOException; |
54 | | -import java.net.UnknownHostException; |
55 | | -import java.util.ArrayList; |
56 | 46 | import java.util.EnumSet; |
57 | | -import java.util.List; |
| 47 | +import java.util.Optional; |
| 48 | +import java.util.concurrent.CompletableFuture; |
58 | 49 | import java.util.concurrent.CountDownLatch; |
59 | 50 | import java.util.concurrent.ExecutionException; |
| 51 | +import java.util.concurrent.TimeUnit; |
| 52 | +import java.util.concurrent.TimeoutException; |
60 | 53 | import java.util.concurrent.atomic.AtomicInteger; |
61 | | -import java.util.concurrent.atomic.AtomicReference; |
62 | 54 |
|
63 | | -import static java.util.Collections.emptyMap; |
64 | | -import static java.util.Collections.emptySet; |
65 | 55 | import static org.hamcrest.Matchers.containsString; |
66 | 56 | import static org.hamcrest.Matchers.either; |
67 | 57 | import static org.hamcrest.Matchers.equalTo; |
|
73 | 63 | @TestLogging("_root:DEBUG") |
74 | 64 | public class ZenDiscoveryIT extends ESIntegTestCase { |
75 | 65 |
|
76 | | - @Override |
77 | | - protected Settings nodeSettings(int nodeOrdinal) { |
78 | | - return Settings.builder().put(super.nodeSettings(nodeOrdinal)) |
79 | | - .put(TestZenDiscovery.USE_ZEN2.getKey(), false) // Zen1-specific stuff in some tests |
80 | | - .build(); |
81 | | - } |
82 | | - |
83 | 66 | public void testNoShardRelocationsOccurWhenElectedMasterNodeFails() throws Exception { |
84 | 67 | Settings defaultSettings = Settings.builder() |
85 | 68 | .put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s") |
@@ -122,7 +105,7 @@ public void testNoShardRelocationsOccurWhenElectedMasterNodeFails() throws Excep |
122 | 105 | assertThat(numRecoveriesAfterNewMaster, equalTo(numRecoveriesBeforeNewMaster)); |
123 | 106 | } |
124 | 107 |
|
125 | | - public void testNodeFailuresAreProcessedOnce() throws ExecutionException, InterruptedException, IOException { |
| 108 | + public void testNodeFailuresAreProcessedOnce() throws IOException { |
126 | 109 | Settings defaultSettings = Settings.builder() |
127 | 110 | .put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s") |
128 | 111 | .put(FaultDetection.PING_RETRIES_SETTING.getKey(), "1") |
@@ -161,78 +144,39 @@ public void testNodeFailuresAreProcessedOnce() throws ExecutionException, Interr |
161 | 144 | assertThat(numUpdates.get(), either(equalTo(1)).or(equalTo(2))); // due to batching, both nodes can be handled in same CS update |
162 | 145 | } |
163 | 146 |
|
164 | | - public void testNodeRejectsClusterStateWithWrongMasterNode() throws Exception { |
165 | | - List<String> nodeNames = internalCluster().startNodes(2); |
166 | | - |
167 | | - List<String> nonMasterNodes = new ArrayList<>(nodeNames); |
168 | | - nonMasterNodes.remove(internalCluster().getMasterName()); |
169 | | - String noneMasterNode = nonMasterNodes.get(0); |
170 | | - |
171 | | - ClusterState state = internalCluster().getInstance(ClusterService.class).state(); |
172 | | - DiscoveryNode node = null; |
173 | | - for (DiscoveryNode discoveryNode : state.nodes()) { |
174 | | - if (discoveryNode.getName().equals(noneMasterNode)) { |
175 | | - node = discoveryNode; |
176 | | - } |
177 | | - } |
178 | | - assert node != null; |
179 | | - |
180 | | - DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(state.nodes()) |
181 | | - .add(new DiscoveryNode("abc", buildNewFakeTransportAddress(), emptyMap(), |
182 | | - emptySet(), Version.CURRENT)).masterNodeId("abc"); |
183 | | - ClusterState.Builder builder = ClusterState.builder(state); |
184 | | - builder.nodes(nodes); |
185 | | - BytesReference bytes = PublishClusterStateAction.serializeFullClusterState(builder.build(), node.getVersion()); |
186 | | - |
187 | | - final CountDownLatch latch = new CountDownLatch(1); |
188 | | - final AtomicReference<Exception> reference = new AtomicReference<>(); |
189 | | - internalCluster().getInstance(TransportService.class, noneMasterNode).sendRequest(node, PublishClusterStateAction.SEND_ACTION_NAME, |
190 | | - new BytesTransportRequest(bytes, Version.CURRENT), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { |
191 | | - |
192 | | - @Override |
193 | | - public void handleResponse(TransportResponse.Empty response) { |
194 | | - super.handleResponse(response); |
195 | | - latch.countDown(); |
196 | | - } |
197 | | - |
198 | | - @Override |
199 | | - public void handleException(TransportException exp) { |
200 | | - super.handleException(exp); |
201 | | - reference.set(exp); |
202 | | - latch.countDown(); |
203 | | - } |
204 | | - }); |
205 | | - latch.await(); |
206 | | - assertThat(reference.get(), notNullValue()); |
207 | | - assertThat(ExceptionsHelper.detailedMessage(reference.get()), |
208 | | - containsString("cluster state from a different master than the current one, rejecting")); |
209 | | - } |
210 | | - |
211 | | - public void testHandleNodeJoin_incompatibleClusterState() throws UnknownHostException { |
212 | | - String masterOnlyNode = internalCluster().startMasterOnlyNode(); |
| 147 | + public void testHandleNodeJoin_incompatibleClusterState() |
| 148 | + throws InterruptedException, ExecutionException, TimeoutException { |
| 149 | + String masterNode = internalCluster().startMasterOnlyNode(); |
213 | 150 | String node1 = internalCluster().startNode(); |
214 | | - ZenDiscovery zenDiscovery = (ZenDiscovery) internalCluster().getInstance(Discovery.class, masterOnlyNode); |
215 | 151 | ClusterService clusterService = internalCluster().getInstance(ClusterService.class, node1); |
| 152 | + Coordinator coordinator = (Coordinator) internalCluster().getInstance(Discovery.class, masterNode); |
216 | 153 | final ClusterState state = clusterService.state(); |
217 | 154 | MetaData.Builder mdBuilder = MetaData.builder(state.metaData()); |
218 | 155 | mdBuilder.putCustom(CustomMetaData.TYPE, new CustomMetaData("data")); |
219 | 156 | ClusterState stateWithCustomMetaData = ClusterState.builder(state).metaData(mdBuilder).build(); |
220 | 157 |
|
221 | | - final AtomicReference<IllegalStateException> holder = new AtomicReference<>(); |
| 158 | + final CompletableFuture<Throwable> future = new CompletableFuture<>(); |
222 | 159 | DiscoveryNode node = state.nodes().getLocalNode(); |
223 | | - zenDiscovery.handleJoinRequest(node, stateWithCustomMetaData, new MembershipAction.JoinCallback() { |
| 160 | + |
| 161 | + coordinator.sendValidateJoinRequest(stateWithCustomMetaData, new JoinRequest(node, Optional.empty()), |
| 162 | + new JoinHelper.JoinCallback() { |
224 | 163 | @Override |
225 | 164 | public void onSuccess() { |
| 165 | + future.completeExceptionally(new AssertionError("onSuccess should not be called")); |
226 | 166 | } |
227 | 167 |
|
228 | 168 | @Override |
229 | 169 | public void onFailure(Exception e) { |
230 | | - holder.set((IllegalStateException) e); |
| 170 | + future.complete(e); |
231 | 171 | } |
232 | 172 | }); |
233 | 173 |
|
234 | | - assertThat(holder.get(), notNullValue()); |
235 | | - assertThat(holder.get().getMessage(), equalTo("failure when sending a validation request to node")); |
| 174 | + Throwable t = future.get(10, TimeUnit.SECONDS); |
| 175 | + |
| 176 | + assertTrue(t instanceof IllegalStateException); |
| 177 | + assertTrue(t.getCause() instanceof RemoteTransportException); |
| 178 | + assertTrue(t.getCause().getCause() instanceof IllegalArgumentException); |
| 179 | + assertThat(t.getCause().getCause().getMessage(), containsString("Unknown NamedWriteable")); |
236 | 180 | } |
237 | 181 |
|
238 | 182 | public static class CustomMetaData extends TestCustomMetaData { |
|
0 commit comments