Skip to content

Commit 3cb4e17

Browse files
vidahaJoshRosen
authored andcommitted
Spark-3213 Fixes issue with spark-ec2 not detecting slaves created with "Launch More like this"
... copy the spark_cluster_tag from a spot instance requests over to the instances. Author: Vida Ha <[email protected]> Closes #2163 from vidaha/vida/spark-3213 and squashes the following commits: 5070a70 [Vida Ha] Spark-3214 Fix issue with spark-ec2 not detecting slaves created with 'Launch More Like This' and using Spot Requests (cherry picked from commit 7faf755) Signed-off-by: Josh Rosen <[email protected]>
1 parent 90f8f3e commit 3cb4e17

File tree

1 file changed

+25
-20
lines changed

1 file changed

+25
-20
lines changed

ec2/spark_ec2.py

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
# A URL prefix from which to fetch AMI information
4141
AMI_PREFIX = "https://raw.github.com/mesos/spark-ec2/v2/ami-list"
4242

43-
4443
class UsageError(Exception):
4544
pass
4645

@@ -450,38 +449,45 @@ def launch_cluster(conn, opts, cluster_name):
450449
print "Launched master in %s, regid = %s" % (zone, master_res.id)
451450

452451
# Give the instances descriptive names
453-
# TODO: Add retry logic for tagging with name since it's used to identify a cluster.
454452
for master in master_nodes:
455453
name = '{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id)
456-
for i in range(0, 5):
457-
try:
458-
master.add_tag(key='Name', value=name)
459-
except:
460-
print "Failed attempt %i of 5 to tag %s" % ((i + 1), name)
461-
if (i == 5):
462-
raise "Error - failed max attempts to add name tag"
463-
time.sleep(5)
464-
454+
tag_instance(master, name)
465455

466456
for slave in slave_nodes:
467457
name = '{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id)
468-
for i in range(0, 5):
469-
try:
470-
slave.add_tag(key='Name', value=name)
471-
except:
472-
print "Failed attempt %i of 5 to tag %s" % ((i + 1), name)
473-
if (i == 5):
474-
raise "Error - failed max attempts to add name tag"
475-
time.sleep(5)
458+
tag_instance(slave, name)
476459

477460
# Return all the instances
478461
return (master_nodes, slave_nodes)
479462

463+
def tag_instance(instance, name):
464+
for i in range(0, 5):
465+
try:
466+
instance.add_tag(key='Name', value=name)
467+
except:
468+
print "Failed attempt %i of 5 to tag %s" % ((i + 1), name)
469+
if (i == 5):
470+
raise "Error - failed max attempts to add name tag"
471+
time.sleep(5)
480472

481473
# Get the EC2 instances in an existing cluster if available.
482474
# Returns a tuple of lists of EC2 instance objects for the masters and slaves
483475
def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
484476
print "Searching for existing cluster " + cluster_name + "..."
477+
# Search all the spot instance requests, and copy any tags from the spot instance request to the cluster.
478+
spot_instance_requests = conn.get_all_spot_instance_requests()
479+
for req in spot_instance_requests:
480+
if req.state != u'active':
481+
continue
482+
name = req.tags.get(u'Name', "")
483+
if name.startswith(cluster_name):
484+
reservations = conn.get_all_instances(instance_ids=[req.instance_id])
485+
for res in reservations:
486+
active = [i for i in res.instances if is_active(i)]
487+
for instance in active:
488+
if (instance.tags.get(u'Name') == None):
489+
tag_instance(instance, name)
490+
# Now proceed to detect master and slaves instances.
485491
reservations = conn.get_all_instances()
486492
master_nodes = []
487493
slave_nodes = []
@@ -504,7 +510,6 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
504510
print >> sys.stderr, "ERROR: Could not find any existing cluster"
505511
sys.exit(1)
506512

507-
508513
# Deploy configuration files and run setup scripts on a newly launched
509514
# or started EC2 cluster.
510515
def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):

0 commit comments

Comments
 (0)