Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,6 @@ boolean processNextCommittedClusterState(String reason) {

final ClusterState newClusterState = pendingStatesQueue.getNextClusterStateToProcess();
final ClusterState currentState = committedState.get();
final ClusterState adaptedNewClusterState;
// all pending states have been processed
if (newClusterState == null) {
return false;
Expand Down Expand Up @@ -773,54 +772,23 @@ boolean processNextCommittedClusterState(String reason) {
if (currentState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock())) {
// its a fresh update from the master as we transition from a start of not having a master to having one
logger.debug("got first state from fresh master [{}]", newClusterState.nodes().getMasterNodeId());
adaptedNewClusterState = newClusterState;
} else if (newClusterState.nodes().isLocalNodeElectedMaster() == false) {
// some optimizations to make sure we keep old objects where possible
ClusterState.Builder builder = ClusterState.builder(newClusterState);

// if the routing table did not change, use the original one
if (newClusterState.routingTable().version() == currentState.routingTable().version()) {
builder.routingTable(currentState.routingTable());
}
// same for metadata
if (newClusterState.metaData().version() == currentState.metaData().version()) {
builder.metaData(currentState.metaData());
} else {
// if its not the same version, only copy over new indices or ones that changed the version
MetaData.Builder metaDataBuilder = MetaData.builder(newClusterState.metaData()).removeAllIndices();
for (IndexMetaData indexMetaData : newClusterState.metaData()) {
IndexMetaData currentIndexMetaData = currentState.metaData().index(indexMetaData.getIndex());
if (currentIndexMetaData != null && currentIndexMetaData.isSameUUID(indexMetaData.getIndexUUID()) &&
currentIndexMetaData.getVersion() == indexMetaData.getVersion()) {
// safe to reuse
metaDataBuilder.put(currentIndexMetaData, false);
} else {
metaDataBuilder.put(indexMetaData, false);
}
}
builder.metaData(metaDataBuilder);
}

adaptedNewClusterState = builder.build();
} else {
adaptedNewClusterState = newClusterState;
}

if (currentState == adaptedNewClusterState) {
if (currentState == newClusterState) {
return false;
}

committedState.set(adaptedNewClusterState);
committedState.set(newClusterState);

// update failure detection only after the state has been updated to prevent race condition with handleLeaveRequest
// and handleNodeFailure as those check the current state to determine whether the failure is to be handled by this node
if (adaptedNewClusterState.nodes().isLocalNodeElectedMaster()) {
if (newClusterState.nodes().isLocalNodeElectedMaster()) {
// update the set of nodes to ping
nodesFD.updateNodesAndPing(adaptedNewClusterState);
nodesFD.updateNodesAndPing(newClusterState);
} else {
// check to see that we monitor the correct master of the cluster
if (masterFD.masterNode() == null || !masterFD.masterNode().equals(adaptedNewClusterState.nodes().getMasterNode())) {
masterFD.restart(adaptedNewClusterState.nodes().getMasterNode(),
if (masterFD.masterNode() == null || !masterFD.masterNode().equals(newClusterState.nodes().getMasterNode())) {
masterFD.restart(newClusterState.nodes().getMasterNode(),
"new cluster state received and we are monitoring the wrong master [" + masterFD.masterNode() + "]");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
Expand All @@ -43,6 +44,8 @@
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.VersionUtils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;

import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -154,4 +157,58 @@ public void testSnapshotDeletionsInProgressSerialization() throws Exception {
assertThat(stateAfterDiffs.custom(SnapshotDeletionsInProgress.TYPE), notNullValue());
}

private ClusterState updateUsingSerialisedDiff(ClusterState original, Diff<ClusterState> diff) throws IOException {
BytesStreamOutput outStream = new BytesStreamOutput();
outStream.setVersion(Version.CURRENT);
diff.writeTo(outStream);
StreamInput inStream = new NamedWriteableAwareStreamInput(outStream.bytes().streamInput(),
new NamedWriteableRegistry(ClusterModule.getNamedWriteables()));
diff = ClusterState.readDiffFrom(inStream, newNode("node-name"));
return diff.apply(original);
}

public void testObjectReuseWhenApplyingClusterStateDiff() throws Exception {
IndexMetaData indexMetaData
= IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(10).numberOfReplicas(1).build();
IndexTemplateMetaData indexTemplateMetaData
= IndexTemplateMetaData.builder("test-template").patterns(new ArrayList<>()).build();
MetaData metaData = MetaData.builder().put(indexMetaData, true).put(indexTemplateMetaData).build();

RoutingTable routingTable = RoutingTable.builder().addAsNew(metaData.index("test")).build();

ClusterState clusterState1 = ClusterState.builder(new ClusterName("clusterName1"))
.metaData(metaData).routingTable(routingTable).build();
BytesStreamOutput outStream = new BytesStreamOutput();
outStream.setVersion(Version.CURRENT);
clusterState1.writeTo(outStream);
StreamInput inStream = new NamedWriteableAwareStreamInput(outStream.bytes().streamInput(),
new NamedWriteableRegistry(ClusterModule.getNamedWriteables()));
ClusterState serializedClusterState1 = ClusterState.readFrom(inStream, newNode("node4"));

// Create a new, albeit equal, IndexMetadata object
ClusterState clusterState2 = ClusterState.builder(clusterState1).incrementVersion()
.metaData(MetaData.builder().put(IndexMetaData.builder(indexMetaData).numberOfReplicas(1).build(), true)).build();
assertNotSame("Should have created a new, equivalent, IndexMetaData object in clusterState2",
clusterState1.metaData().index("test"), clusterState2.metaData().index("test"));

ClusterState serializedClusterState2 = updateUsingSerialisedDiff(serializedClusterState1, clusterState2.diff(clusterState1));
assertSame("Unchanged metadata should not create new IndexMetaData objects",
serializedClusterState1.metaData().index("test"), serializedClusterState2.metaData().index("test"));
assertSame("Unchanged routing table should not create new IndexRoutingTable objects",
serializedClusterState1.routingTable().index("test"), serializedClusterState2.routingTable().index("test"));

// Create a new and different IndexMetadata object
ClusterState clusterState3 = ClusterState.builder(clusterState1).incrementVersion()
.metaData(MetaData.builder().put(IndexMetaData.builder(indexMetaData).numberOfReplicas(2).build(), true)).build();
ClusterState serializedClusterState3 = updateUsingSerialisedDiff(serializedClusterState2, clusterState3.diff(clusterState2));
assertNotEquals("Should have a new IndexMetaData object",
serializedClusterState2.metaData().index("test"), serializedClusterState3.metaData().index("test"));
assertSame("Unchanged routing table should not create new IndexRoutingTable objects",
serializedClusterState2.routingTable().index("test"), serializedClusterState3.routingTable().index("test"));

assertSame("nodes", serializedClusterState2.nodes(), serializedClusterState3.nodes());
assertSame("blocks", serializedClusterState2.blocks(), serializedClusterState3.blocks());
assertSame("template", serializedClusterState2.metaData().templates().get("test-template"),
serializedClusterState3.metaData().templates().get("test-template"));
}
}