Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,13 @@ class ClusterConfiguration {
* In case of more than one node, this defaults to the number of nodes
*/
@Input
Closure<Integer> minimumMasterNodes = { getNumNodes() > 1 ? getNumNodes() : -1 }
Closure<Integer> minimumMasterNodes = {
if (bwcVersion != null && bwcVersion.before("6.5.0")) {
return numNodes > 1 ? numNodes : -1
} else {
return numNodes > 1 ? numNodes.intdiv(2) + 1 : -1
}
}

@Input
String jvmArgs = "-Xms" + System.getProperty('tests.heap.size', '512m') +
Expand Down Expand Up @@ -104,6 +110,14 @@ class ClusterConfiguration {
return seedNode.transportUri()
}

/**
* A closure to call which returns a manually supplied list of unicast seed hosts.
*/
@Input
Closure<List<String>> otherUnicastHostAddresses = {
Collections.emptyList()
}

/**
* A closure to call before the cluster is considered ready. The closure is passed the node info,
* as well as a groovy AntBuilder, to enable running ant condition checks. The default wait
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,31 @@ class ClusterFormationTasks {
}
NodeInfo node = new NodeInfo(config, i, project, prefix, elasticsearchVersion, sharedDir)
nodes.add(node)
Object dependsOn = startTasks.empty ? startDependencies : startTasks.get(0)
startTasks.add(configureNode(project, prefix, runner, dependsOn, node, config, distro, nodes.get(0)))
Closure<Map> writeConfigSetup
Object dependsOn
if (node.nodeVersion.onOrAfter("6.5.0")) {
writeConfigSetup = { Map esConfig ->
// Don't force discovery provider if one is set by the test cluster specs already
if (esConfig.containsKey('discovery.zen.hosts_provider') == false) {
esConfig['discovery.zen.hosts_provider'] = 'file'
}
esConfig['discovery.zen.ping.unicast.hosts'] = []
esConfig
}
dependsOn = startDependencies
} else {
dependsOn = startTasks.empty ? startDependencies : startTasks.get(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why we are picking up these dependencies on older versions only ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, this is because we don't have the file based discovery in the main codebase in earlier versions so we don't use it.

writeConfigSetup = { Map esConfig ->
String unicastTransportUri = node.config.unicastTransportUri(nodes.get(0), node, project.createAntBuilder())
if (unicastTransportUri == null) {
esConfig['discovery.zen.ping.unicast.hosts'] = []
} else {
esConfig['discovery.zen.ping.unicast.hosts'] = "\"${unicastTransportUri}\""
}
esConfig
}
}
startTasks.add(configureNode(project, prefix, runner, dependsOn, node, config, distro, writeConfigSetup))
}

Task wait = configureWaitTask("${prefix}#wait", project, nodes, startTasks, config.nodeStartupWaitSeconds)
Expand Down Expand Up @@ -184,7 +207,7 @@ class ClusterFormationTasks {
* @return a task which starts the node.
*/
static Task configureNode(Project project, String prefix, Task runner, Object dependsOn, NodeInfo node, ClusterConfiguration config,
Configuration distribution, NodeInfo seedNode) {
Configuration distribution, Closure<Map> writeConfig) {

// tasks are chained so their execution order is maintained
Task setup = project.tasks.create(name: taskName(prefix, node, 'clean'), type: Delete, dependsOn: dependsOn) {
Expand All @@ -200,7 +223,7 @@ class ClusterFormationTasks {
setup = configureCheckPreviousTask(taskName(prefix, node, 'checkPrevious'), project, setup, node)
setup = configureStopTask(taskName(prefix, node, 'stopPrevious'), project, setup, node)
setup = configureExtractTask(taskName(prefix, node, 'extract'), project, setup, node, distribution)
setup = configureWriteConfigTask(taskName(prefix, node, 'configure'), project, setup, node, seedNode)
setup = configureWriteConfigTask(taskName(prefix, node, 'configure'), project, setup, node, writeConfig)
setup = configureCreateKeystoreTask(taskName(prefix, node, 'createKeystore'), project, setup, node)
setup = configureAddKeystoreSettingTasks(prefix, project, setup, node)
setup = configureAddKeystoreFileTasks(prefix, project, setup, node)
Expand Down Expand Up @@ -303,7 +326,7 @@ class ClusterFormationTasks {
}

/** Adds a task to write elasticsearch.yml for the given node configuration */
static Task configureWriteConfigTask(String name, Project project, Task setup, NodeInfo node, NodeInfo seedNode) {
static Task configureWriteConfigTask(String name, Project project, Task setup, NodeInfo node, Closure<Map> configFilter) {
Map esConfig = [
'cluster.name' : node.clusterName,
'node.name' : "node-" + node.nodeNum,
Expand All @@ -317,7 +340,7 @@ class ClusterFormationTasks {
if (minimumMasterNodes > 0) {
esConfig['discovery.zen.minimum_master_nodes'] = minimumMasterNodes
}
if (node.config.numNodes > 1) {
if (minimumMasterNodes > 1) {
// don't wait for state.. just start up quickly
// this will also allow new and old nodes in the BWC case to become the master
esConfig['discovery.initial_state_timeout'] = '0s'
Expand Down Expand Up @@ -347,10 +370,7 @@ class ClusterFormationTasks {

Task writeConfig = project.tasks.create(name: name, type: DefaultTask, dependsOn: setup)
writeConfig.doFirst {
String unicastTransportUri = node.config.unicastTransportUri(seedNode, node, project.ant)
if (unicastTransportUri != null) {
esConfig['discovery.zen.ping.unicast.hosts'] = "\"${unicastTransportUri}\""
}
esConfig = configFilter.call(esConfig)
File configFile = new File(node.pathConf, 'elasticsearch.yml')
logger.info("Configuring ${configFile}")
configFile.setText(esConfig.collect { key, value -> "${key}: ${value}" }.join('\n'), 'UTF-8')
Expand Down Expand Up @@ -681,6 +701,20 @@ class ClusterFormationTasks {
static Task configureWaitTask(String name, Project project, List<NodeInfo> nodes, List<Task> startTasks, int waitSeconds) {
Task wait = project.tasks.create(name: name, dependsOn: startTasks)
wait.doLast {

Collection<String> unicastHosts = new HashSet<>()
nodes.forEach { node ->
unicastHosts.addAll(node.config.otherUnicastHostAddresses.call())
String unicastHost = node.config.unicastTransportUri(node, null, project.createAntBuilder())
if (unicastHost != null) {
unicastHosts.addAll(Arrays.asList(unicastHost.split(",")))
}
}
String unicastHostsTxt = String.join("\n", unicastHosts)
nodes.forEach { node ->
node.pathConf.toPath().resolve("unicast_hosts.txt").setText(unicastHostsTxt)
}

ant.waitfor(maxwait: "${waitSeconds}", maxwaitunit: 'second', checkevery: '500', checkeveryunit: 'millisecond', timeoutproperty: "failed${name}") {
or {
for (NodeInfo node : nodes) {
Expand Down Expand Up @@ -867,9 +901,10 @@ class ClusterFormationTasks {
outputPrintStream: outputStream,
messageOutputLevel: org.apache.tools.ant.Project.MSG_INFO)

project.ant.project.addBuildListener(listener)
Object retVal = command(project.ant)
project.ant.project.removeBuildListener(listener)
AntBuilder ant = project.createAntBuilder()
ant.project.addBuildListener(listener)
Object retVal = command(ant)
ant.project.removeBuildListener(listener)
return retVal
}

Expand Down
13 changes: 6 additions & 7 deletions qa/rolling-upgrade/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ for (Version version : bwcVersions.wireCompatible) {
mustRunAfter(precommit)
}

Object extension = extensions.findByName("${baseName}#oldClusterTestCluster")
configure(extensions.findByName("${baseName}#oldClusterTestCluster")) {
bwcVersion = version
numBwcNodes = 3
Expand All @@ -76,12 +75,12 @@ for (Version version : bwcVersions.wireCompatible) {
systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '')
}

Closure configureUpgradeCluster = {String name, Task lastRunner, int stopNode, Closure unicastSeed ->
Closure configureUpgradeCluster = {String name, Task lastRunner, int stopNode, Closure getOtherUnicastHostAddresses ->
configure(extensions.findByName("${baseName}#${name}")) {
dependsOn lastRunner, "${baseName}#oldClusterTestCluster#node${stopNode}.stop"
clusterName = 'rolling-upgrade'
unicastTransportUri = { seedNode, node, ant -> unicastSeed() }
minimumMasterNodes = { 3 }
otherUnicastHostAddresses = { getOtherUnicastHostAddresses() }
minimumMasterNodes = { 2 }
/* Override the data directory so the new node always gets the node we
* just stopped's data directory. */
dataDir = { nodeNumber -> oldClusterTest.nodes[stopNode].dataDir }
Expand All @@ -95,7 +94,7 @@ for (Version version : bwcVersions.wireCompatible) {

configureUpgradeCluster("oneThirdUpgradedTestCluster", oldClusterTestRunner, 0,
// Use all running nodes as seed nodes so there is no race between pinging and the tests
{ oldClusterTest.nodes.get(1).transportUri() + ',' + oldClusterTest.nodes.get(2).transportUri() })
{ [oldClusterTest.nodes.get(1).transportUri(), oldClusterTest.nodes.get(2).transportUri()] })

Task oneThirdUpgradedTestRunner = tasks.getByName("${baseName}#oneThirdUpgradedTestRunner")
oneThirdUpgradedTestRunner.configure {
Expand All @@ -109,7 +108,7 @@ for (Version version : bwcVersions.wireCompatible) {

configureUpgradeCluster("twoThirdsUpgradedTestCluster", oneThirdUpgradedTestRunner, 1,
// Use all running nodes as seed nodes so there is no race between pinging and the tests
{ oldClusterTest.nodes.get(2).transportUri() + ',' + oneThirdUpgradedTest.nodes.get(0).transportUri() })
{ [oldClusterTest.nodes.get(2).transportUri(), oneThirdUpgradedTest.nodes.get(0).transportUri()] })

Task twoThirdsUpgradedTestRunner = tasks.getByName("${baseName}#twoThirdsUpgradedTestRunner")
twoThirdsUpgradedTestRunner.configure {
Expand All @@ -123,7 +122,7 @@ for (Version version : bwcVersions.wireCompatible) {

configureUpgradeCluster("upgradedClusterTestCluster", twoThirdsUpgradedTestRunner, 2,
// Use all running nodes as seed nodes so there is no race between pinging and the tests
{ oneThirdUpgradedTest.nodes.get(0).transportUri() + ',' + twoThirdsUpgradedTest.nodes.get(0).transportUri() })
{ [oneThirdUpgradedTest.nodes.get(0).transportUri(), twoThirdsUpgradedTest.nodes.get(0).transportUri()] })

Task upgradedClusterTestRunner = tasks.getByName("${baseName}#upgradedClusterTestRunner")
upgradedClusterTestRunner.configure {
Expand Down
14 changes: 6 additions & 8 deletions x-pack/qa/rolling-upgrade/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ subprojects {
mustRunAfter(precommit)
}

Object extension = extensions.findByName("${baseName}#oldClusterTestCluster")
configure(extensions.findByName("${baseName}#oldClusterTestCluster")) {
dependsOn copyTestNodeKeystore
if (version.before('6.3.0')) {
Expand All @@ -131,7 +130,6 @@ subprojects {
bwcVersion = version
numBwcNodes = 3
numNodes = 3
minimumMasterNodes = { 3 }
clusterName = 'rolling-upgrade'
waitCondition = waitWithAuth
setting 'xpack.monitoring.exporters._http.type', 'http'
Expand Down Expand Up @@ -184,13 +182,13 @@ subprojects {
systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '')
}

Closure configureUpgradeCluster = {String name, Task lastRunner, int stopNode, Closure unicastSeed ->
Closure configureUpgradeCluster = {String name, Task lastRunner, int stopNode, Closure getOtherUnicastHostAddresses ->
configure(extensions.findByName("${baseName}#${name}")) {
dependsOn lastRunner, "${baseName}#oldClusterTestCluster#node${stopNode}.stop"
setupCommand 'setupTestUser', 'bin/elasticsearch-users', 'useradd', 'test_user', '-p', 'x-pack-test-password', '-r', 'superuser'
clusterName = 'rolling-upgrade'
unicastTransportUri = { seedNode, node, ant -> unicastSeed() }
minimumMasterNodes = { 3 }
otherUnicastHostAddresses = { getOtherUnicastHostAddresses() }
minimumMasterNodes = { 2 }
/* Override the data directory so the new node always gets the node we
* just stopped's data directory. */
dataDir = { nodeNumber -> oldClusterTest.nodes[stopNode].dataDir }
Expand Down Expand Up @@ -226,7 +224,7 @@ subprojects {

configureUpgradeCluster("oneThirdUpgradedTestCluster", oldClusterTestRunner, 0,
// Use all running nodes as seed nodes so there is no race between pinging and the tests
{ oldClusterTest.nodes.get(1).transportUri() + ',' + oldClusterTest.nodes.get(2).transportUri() })
{ [oldClusterTest.nodes.get(1).transportUri(), oldClusterTest.nodes.get(2).transportUri()] })

Task oneThirdUpgradedTestRunner = tasks.getByName("${baseName}#oneThirdUpgradedTestRunner")
oneThirdUpgradedTestRunner.configure {
Expand All @@ -246,7 +244,7 @@ subprojects {

configureUpgradeCluster("twoThirdsUpgradedTestCluster", oneThirdUpgradedTestRunner, 1,
// Use all running nodes as seed nodes so there is no race between pinging and the tests
{ oldClusterTest.nodes.get(2).transportUri() + ',' + oneThirdUpgradedTest.nodes.get(0).transportUri() })
{ [oldClusterTest.nodes.get(2).transportUri(), oneThirdUpgradedTest.nodes.get(0).transportUri()] })

Task twoThirdsUpgradedTestRunner = tasks.getByName("${baseName}#twoThirdsUpgradedTestRunner")
twoThirdsUpgradedTestRunner.configure {
Expand All @@ -260,7 +258,7 @@ subprojects {

configureUpgradeCluster("upgradedClusterTestCluster", twoThirdsUpgradedTestRunner, 2,
// Use all running nodes as seed nodes so there is no race between pinging and the tests
{ oneThirdUpgradedTest.nodes.get(0).transportUri() + ',' + twoThirdsUpgradedTest.nodes.get(0).transportUri() })
{ [oneThirdUpgradedTest.nodes.get(0).transportUri(), twoThirdsUpgradedTest.nodes.get(0).transportUri()] })

Task upgradedClusterTestRunner = tasks.getByName("${baseName}#upgradedClusterTestRunner")
upgradedClusterTestRunner.configure {
Expand Down