Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
63d0406
[Discovery] lightweight minimum master node recovery
kimchy Apr 10, 2014
4824f05
[Internal] make no master lock an instance var so it can be configured
kimchy Apr 11, 2014
6ede83a
[Discovery] add rejoin on master gone flag, defaults to false
kimchy Apr 11, 2014
97bdc8f
[Discovery] Make noMasterBlock configurable and added simple test tha…
martijnvg Apr 17, 2014
3cdbb1a
[Discovery] Enable `discovery.zen.rejoin_on_master_gone` setting in D…
martijnvg Apr 17, 2014
549076e
[Discovery] Changed the default for the 'rejoin_on_master_gone' optio…
martijnvg Apr 23, 2014
89a50f6
[Discovery] If available newly elected master node should take over p…
martijnvg Apr 28, 2014
2220c66
[Discovery] Eagerly clean the routing table of shards that exist on n…
martijnvg May 2, 2014
a9aa10a
Updated to use ClusterBlocks new constructor signature
bleskes May 18, 2014
d44bed5
[Internal] Do not execute cluster state changes if current node is no…
bleskes May 16, 2014
2c9ef63
[TEST] It may take a little bit before the unlucky node deals with th…
martijnvg Jun 5, 2014
fc8ae4d
[TEST] Added test that verifies data integrity during and after a sim…
martijnvg Jun 6, 2014
e7d24ec
[TEST] Make sure there no initializing shards when network partition …
martijnvg Jun 6, 2014
4828e78
[TEST] Added test that exposes a shard consistency problem when isola…
martijnvg Jun 11, 2014
424a2f6
[Discovery] Removed METADATA block
martijnvg Jun 11, 2014
1849d09
[Discovery] Made 'discovery.zen.rejoin_on_master_gone' setting updata…
martijnvg Jun 11, 2014
58f8774
[Discovery] do not use versions to optimize cluster state copying for…
bleskes Jun 11, 2014
f3d90cd
[TEST] Remove 'index.routing.allocation.total_shards_per_node' settin…
martijnvg Jun 12, 2014
e39ac7e
[Test] testIsolateMasterAndVerifyClusterStateConsensus didn't wait o…
bleskes Jun 13, 2014
7db9e98
[Discovery] Change (Master|Nodes)FaultDetection's connect_on_network_…
bleskes Jun 14, 2014
8b85d97
[Discovery] Improved logging when a join request is not executed beca…
bleskes Jun 15, 2014
5d13571
[Discovery] when master is gone, flush all pending cluster states
bleskes Jun 17, 2014
28489ce
[Tests] Added ServiceDisruptionScheme(s) and testAckedIndexing
bleskes May 16, 2014
8aed9ee
[TEST] Check if worker if null to prevent NPE on double stopping
martijnvg Jun 19, 2014
785d0e5
[TEST] Reduced failures in DiscoveryWithNetworkFailuresTests#testAcke…
martijnvg Jun 23, 2014
f7b962a
[TEST] Renamed afterDistribution timeout to expectedTimeToHeal
martijnvg Jun 23, 2014
a7a61a0
[Test] ensureStableCluster failed to pass viaNode parameter correctly
bleskes Jun 24, 2014
1af82fd
[Tests] Disabling testAckedIndexing
bleskes Jun 24, 2014
c3e84eb
Fixed compilation issue caused by the lack of a thread pool name
bleskes Jun 26, 2014
98084c0
[TEST] Added test to verify if 'discovery.zen.rejoin_on_master_gone' …
martijnvg Jun 30, 2014
52f69c6
[TEST] Verify no master block during partition for read and write apis
martijnvg Jun 30, 2014
77dae63
[TEST] Make sure get request is always local
martijnvg Jul 1, 2014
5e5f8a9
Added java docs to all tests in DiscoveryWithNetworkFailuresTests
bleskes Jul 2, 2014
e897dcc
[Tests] improved automatic disruption healing after tests
bleskes Jul 2, 2014
d99ca80
[TEST] Properly clear the disruption schemes after test completed.
martijnvg Jul 3, 2014
48c7da1
[Test] testVerifyApiBlocksDuringPartition - wait for stable cluster a…
bleskes Jul 6, 2014
5302a53
[Discovery] immediately start Master|Node fault detection pinging
bleskes Jul 3, 2014
3586e38
[Discovery] Start master fault detection after pingInterval
bleskes Jul 9, 2014
522d4af
[Tests] Use local gateway
bleskes Jul 10, 2014
7b6e194
[Tests] Don't log about restoring a partition if the partition is not…
bleskes Jul 10, 2014
c12d090
[Tests] Increase timeout when waiting for partitions to heal
bleskes Jul 10, 2014
e0543b3
[Internal] Migrate new initial state cluster update task to a Cluster…
bleskes Jul 11, 2014
7fa3d70
[logging] don't log an error if scheduled reroute is rejected because…
bleskes Jul 15, 2014
ccabb4a
Remove unneeded reference to DiscoveryService which potentially cause…
bleskes Jul 16, 2014
ea27837
[Tests] Introduced ClusterDiscoveryConfiguration
bleskes Jul 13, 2014
bebaf97
[Tests] stability improvements
bleskes Jul 14, 2014
f029a24
[Store] migrate non-allocated shard deletion to use ClusterStateNonMa…
bleskes Jul 17, 2014
67685cb
Discovery: If not enough possible masters are found, but there are ma…
martijnvg Jul 17, 2014
5e38e9e
Discovery: Only add local node to possibleMasterNodes if it is a mast…
martijnvg Jul 17, 2014
c2142c0
Discovery: Don't include local node to pingMasters list. We might end…
martijnvg Jul 17, 2014
a409848
[Tests] Fixed some issues with SlowClusterStateProcessing
bleskes Jul 20, 2014
ffcf107
[Discovery] join master after first election
bleskes Jul 18, 2014
cccd060
[Discovery] verify we have a master after a successful join request
bleskes Jul 22, 2014
0244ddb
retry logic to unwrap exception to check for illegal state
kimchy Jul 22, 2014
130e680
[Discovery] Made the handeling of the join request batch oriented.
martijnvg Jul 23, 2014
364374d
[TEST] Added test that verifies that no shard relocations happen duri…
martijnvg Jul 25, 2014
4b8456e
[Discovery] Master fault detection and nodes fault detection should t…
martijnvg Jul 28, 2014
50f852f
[TEST] Added LongGCDisruption and a test simulating GC on master nodes
bleskes Jul 29, 2014
403ebc9
[Discovery] Added cluster version and master node to the nodes fault …
martijnvg Jul 31, 2014
47326ad
[TEST] Make sure all shards are allocated before killing a random dat…
martijnvg Jul 31, 2014
966a55d
Typo: s/Recieved/Received
martijnvg Jul 31, 2014
26d9088
[Transport] Introduced worker threads to prevent alien threads of ent…
bleskes Jul 31, 2014
702890e
[TEST] Remove the forceful `network.mode` setting in DiscoveryWithSe…
martijnvg Jul 31, 2014
c8919e4
[TEST] Changed action names.
martijnvg Aug 5, 2014
5932371
[TEST] Adapt testNoMasterActions since metadata isn't cleared if ther…
martijnvg Aug 5, 2014
ff8b740
[Discovery] add a debug log if a node responds to a publish request a…
bleskes Aug 16, 2014
d5552a9
[Discovery] UnicastZenPing should also ping last known discoNodes
bleskes Aug 19, 2014
183ca37
Code style improvement
bleskes Aug 29, 2014
d159097
[Internal] moved ZenDiscovery setting to use string constants
bleskes Aug 29, 2014
680fb36
[Discovery] Add try/catch around repetitive onSuccess calls
bleskes Aug 29, 2014
ed5b2e0
Add an assertion to ZenDiscovery checking that local node is never el…
bleskes Aug 29, 2014
d8a5ff0
[Internal] introduce ClusterState.UNKNOWN_VERSION constant
bleskes Aug 29, 2014
596a4a0
[Internal] Extract a common base class for (Master|Nodes)FaultDetection
bleskes Aug 29, 2014
34f4ca7
[Cluster] Refactored ClusterStateUpdateTask protection against execut…
bleskes Aug 29, 2014
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
468 changes: 239 additions & 229 deletions pom.xml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ protected ClusterUpdateSettingsResponse newResponse(boolean acknowledged) {
return new ClusterUpdateSettingsResponse(updateSettingsAcked && acknowledged, transientUpdates.build(), persistentUpdates.build());
}

@Override
public void onNoLongerMaster(String source) {
logger.debug("failed to preform reroute after cluster settings were updated - current node is no longer a master");
listener.onResponse(new ClusterUpdateSettingsResponse(updateSettingsAcked, transientUpdates.build(), persistentUpdates.build()));
}

@Override
public void onFailure(String source, Throwable t) {
//if the reroute fails we only log
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,12 @@ protected GroupShardsIterator shards(ClusterState state, RecoveryRequest request

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, RecoveryRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}

@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, RecoveryRequest request, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, concreteIndices);
return state.blocks().indicesBlockedException(ClusterBlockLevel.READ, concreteIndices);
}

static class ShardRecoveryRequest extends BroadcastShardOperationRequest {
Expand Down
42 changes: 23 additions & 19 deletions src/main/java/org/elasticsearch/action/bench/BenchmarkService.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ public class BenchmarkService extends AbstractLifecycleComponent<BenchmarkServic
/**
* Constructs a service component for running benchmarks
*
* @param settings Settings
* @param clusterService Cluster service
* @param threadPool Thread pool
* @param client Client
* @param transportService Transport service
* @param settings Settings
* @param clusterService Cluster service
* @param threadPool Thread pool
* @param client Client
* @param transportService Transport service
*/
@Inject
public BenchmarkService(Settings settings, ClusterService clusterService, ThreadPool threadPool,
Expand All @@ -86,19 +86,22 @@ public BenchmarkService(Settings settings, ClusterService clusterService, Thread
}

@Override
protected void doStart() throws ElasticsearchException { }
protected void doStart() throws ElasticsearchException {
}

@Override
protected void doStop() throws ElasticsearchException { }
protected void doStop() throws ElasticsearchException {
}

@Override
protected void doClose() throws ElasticsearchException { }
protected void doClose() throws ElasticsearchException {
}

/**
* Lists actively running benchmarks on the cluster
*
* @param request Status request
* @param listener Response listener
* @param request Status request
* @param listener Response listener
*/
public void listBenchmarks(final BenchmarkStatusRequest request, final ActionListener<BenchmarkStatusResponse> listener) {

Expand Down Expand Up @@ -171,8 +174,8 @@ public void onFailure(Throwable t) {
/**
* Executes benchmarks on the cluster
*
* @param request Benchmark request
* @param listener Response listener
* @param request Benchmark request
* @param listener Response listener
*/
public void startBenchmark(final BenchmarkRequest request, final ActionListener<BenchmarkResponse> listener) {

Expand Down Expand Up @@ -228,7 +231,7 @@ public void onFailure(Throwable t) {
listener.onFailure(t);
}
}, (benchmarkResponse.state() != BenchmarkResponse.State.ABORTED) &&
(benchmarkResponse.state() != BenchmarkResponse.State.FAILED)));
(benchmarkResponse.state() != BenchmarkResponse.State.FAILED)));
}

private final boolean isBenchmarkNode(DiscoveryNode node) {
Expand Down Expand Up @@ -403,6 +406,7 @@ protected CountDownAsyncHandler(int size) {
}

public abstract T newInstance();

protected abstract void sendResponse();

@Override
Expand Down Expand Up @@ -593,7 +597,7 @@ public ClusterState execute(ClusterState currentState) {

if (bmd != null) {
for (BenchmarkMetaData.Entry entry : bmd.entries()) {
if (request.benchmarkName().equals(entry.benchmarkId())){
if (request.benchmarkName().equals(entry.benchmarkId())) {
if (entry.state() != BenchmarkMetaData.State.SUCCESS && entry.state() != BenchmarkMetaData.State.FAILED) {
throw new ElasticsearchException("A benchmark with ID [" + request.benchmarkName() + "] is already running in state [" + entry.state() + "]");
}
Expand Down Expand Up @@ -648,7 +652,7 @@ public FinishBenchmarkTask(String reason, String benchmarkId, BenchmarkStateList
@Override
protected BenchmarkMetaData.Entry process(BenchmarkMetaData.Entry entry) {
BenchmarkMetaData.State state = entry.state();
assert state == BenchmarkMetaData.State.STARTED || state == BenchmarkMetaData.State.ABORTED : "Expected state: STARTED or ABORTED but was: " + entry.state();
assert state == BenchmarkMetaData.State.STARTED || state == BenchmarkMetaData.State.ABORTED : "Expected state: STARTED or ABORTED but was: " + entry.state();
if (success) {
return new BenchmarkMetaData.Entry(entry, BenchmarkMetaData.State.SUCCESS);
} else {
Expand All @@ -661,7 +665,7 @@ public final class AbortBenchmarkTask extends UpdateBenchmarkStateTask {
private final String[] patterns;

public AbortBenchmarkTask(String[] patterns, BenchmarkStateListener listener) {
super("abort_benchmark", null , listener);
super("abort_benchmark", null, listener);
this.patterns = patterns;
}

Expand All @@ -675,7 +679,7 @@ protected BenchmarkMetaData.Entry process(BenchmarkMetaData.Entry entry) {
}
}

public abstract class UpdateBenchmarkStateTask implements ProcessedClusterStateUpdateTask {
public abstract class UpdateBenchmarkStateTask extends ProcessedClusterStateUpdateTask {

private final String reason;
protected final String benchmarkId;
Expand All @@ -702,7 +706,7 @@ public ClusterState execute(ClusterState currentState) {
ImmutableList.Builder<BenchmarkMetaData.Entry> builder = new ImmutableList.Builder<BenchmarkMetaData.Entry>();
for (BenchmarkMetaData.Entry e : bmd.entries()) {
if (benchmarkId == null || match(e)) {
e = process(e) ;
e = process(e);
instances.add(e);
}
// Don't keep finished benchmarks around in cluster state
Expand Down Expand Up @@ -741,7 +745,7 @@ public String reason() {
}
}

public abstract class BenchmarkStateChangeAction<R extends MasterNodeOperationRequest> implements TimeoutClusterStateUpdateTask {
public abstract class BenchmarkStateChangeAction<R extends MasterNodeOperationRequest> extends TimeoutClusterStateUpdateTask {
protected final R request;

public BenchmarkStateChangeAction(R request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* An extension interface to {@link ClusterStateUpdateTask} that allows to be notified when
* all the nodes have acknowledged a cluster state update request
*/
public abstract class AckedClusterStateUpdateTask<Response> implements TimeoutClusterStateUpdateTask {
public abstract class AckedClusterStateUpdateTask<Response> extends TimeoutClusterStateUpdateTask {

private final ActionListener<Response> listener;
private final AckedRequest request;
Expand All @@ -40,6 +40,7 @@ protected AckedClusterStateUpdateTask(AckedRequest request, ActionListener<Respo

/**
* Called to determine which nodes the acknowledgement is expected from
*
* @param discoveryNode a node
* @return true if the node is expected to send ack back, false otherwise
*/
Expand All @@ -50,6 +51,7 @@ public boolean mustAck(DiscoveryNode discoveryNode) {
/**
* Called once all the nodes have acknowledged the cluster state update request. Must be
* very lightweight execution, since it gets executed on the cluster service thread.
*
* @param t optional error that might have been thrown
*/
public void onAllNodesAcked(@Nullable Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,5 @@ public interface ClusterService extends LifecycleComponent<ClusterService> {
* Returns the tasks that are pending.
*/
List<PendingClusterTask> pendingTasks();

}
2 changes: 2 additions & 0 deletions src/main/java/org/elasticsearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ public static <T extends Custom> Custom.Factory<T> lookupFactorySafe(String type
}


public static final long UNKNOWN_VERSION = -1;

private final long version;

private final RoutingTable routingTable;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster;

/**
* This is a marker interface to indicate that the task should be executed
* even if the current node is not a master.
*/
public abstract class ClusterStateNonMasterUpdateTask extends ClusterStateUpdateTask {

@Override
public boolean runOnlyOnMaster() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,37 @@

package org.elasticsearch.cluster;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;

/**
* A task that can update the cluster state.
*/
public interface ClusterStateUpdateTask {
abstract public class ClusterStateUpdateTask {

/**
* Update the cluster state based on the current state. Return the *same instance* if no state
* should be changed.
*/
ClusterState execute(ClusterState currentState) throws Exception;
abstract public ClusterState execute(ClusterState currentState) throws Exception;

/**
* A callback called when execute fails.
*/
void onFailure(String source, Throwable t);
abstract public void onFailure(String source, @Nullable Throwable t);


/**
* indicates whether this task should only run if current node is master
*/
public boolean runOnlyOnMaster() {
return true;
}

/**
* called when the task was rejected because the local node is no longer master
*/
public void onNoLongerMaster(String source) {
onFailure(source, new EsRejectedExecutionException("no longer master. source: [" + source + "]"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;

/**
* A combination between {@link org.elasticsearch.cluster.ProcessedClusterStateUpdateTask} and
* {@link org.elasticsearch.cluster.ClusterStateNonMasterUpdateTask} to allow easy creation of anonymous classes
*/
abstract public class ProcessedClusterStateNonMasterUpdateTask extends ProcessedClusterStateUpdateTask {

@Override
public boolean runOnlyOnMaster() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
* An extension interface to {@link ClusterStateUpdateTask} that allows to be notified when
* the cluster state update has been processed.
*/
public interface ProcessedClusterStateUpdateTask extends ClusterStateUpdateTask {
public abstract class ProcessedClusterStateUpdateTask extends ClusterStateUpdateTask {

/**
* Called when the result of the {@link #execute(ClusterState)} have been processed
* properly by all listeners.
*/
void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState);
public abstract void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
* An extension interface to {@link org.elasticsearch.cluster.ClusterStateUpdateTask} that allows to associate
* a timeout.
*/
public interface TimeoutClusterStateUpdateTask extends ProcessedClusterStateUpdateTask {
abstract public class TimeoutClusterStateUpdateTask extends ProcessedClusterStateUpdateTask {

/**
* If the cluster state update task wasn't processed by the provided timeout, call
* {@link #onFailure(String, Throwable)}
*/
TimeValue timeout();
abstract public TimeValue timeout();
}
13 changes: 13 additions & 0 deletions src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,19 @@ public boolean hasGlobalBlock(ClusterBlock block) {
return global.contains(block);
}

public boolean hasGlobalBlock(int blockId) {
for (ClusterBlock clusterBlock : global) {
if (clusterBlock.id() == blockId) {
return true;
}
}
return false;
}

public boolean hasGlobalBlock(ClusterBlockLevel level) {
return global(level).size() > 0;
}

/**
* Is there a global block with the provided status?
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,15 @@ public ClusterState execute(ClusterState currentState) {
return ClusterState.builder(currentState).routingResult(routingResult).build();
}

@Override
public void onNoLongerMaster(String source) {
// no biggie
}

@Override
public void onFailure(String source, Throwable t) {
ClusterState state = clusterService.state();
logger.error("unexpected failure during [{}], current state:\n{}", t, source, state.prettyPrint());
ClusterState state = clusterService.state();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we sure the clusterService.state() can never be null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can't. many things rely on that fact...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok fine! :)

logger.error("unexpected failure during [{}], current state:\n{}", t, source, state.prettyPrint());
}
});
routingTableDirty = false;
Expand Down
Loading