Skip to content

Commit 5070a70

Browse files
committed
Spark-3214 Fix issue with spark-ec2 not detecting slaves created with 'Launch More Like This' and using Spot Requests
1 parent b92d823 commit 5070a70

File tree

1 file changed

+25
-20
lines changed

1 file changed

+25
-20
lines changed

ec2/spark_ec2.py

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
# A URL prefix from which to fetch AMI information
4141
AMI_PREFIX = "https://raw.github.com/mesos/spark-ec2/v2/ami-list"
4242

43-
4443
class UsageError(Exception):
4544
pass
4645

@@ -463,38 +462,45 @@ def launch_cluster(conn, opts, cluster_name):
463462
print "Launched master in %s, regid = %s" % (zone, master_res.id)
464463

465464
# Give the instances descriptive names
466-
# TODO: Add retry logic for tagging with name since it's used to identify a cluster.
467465
for master in master_nodes:
468466
name = '{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id)
469-
for i in range(0, 5):
470-
try:
471-
master.add_tag(key='Name', value=name)
472-
except:
473-
print "Failed attempt %i of 5 to tag %s" % ((i + 1), name)
474-
if (i == 5):
475-
raise "Error - failed max attempts to add name tag"
476-
time.sleep(5)
477-
467+
tag_instance(master, name)
478468

479469
for slave in slave_nodes:
480470
name = '{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id)
481-
for i in range(0, 5):
482-
try:
483-
slave.add_tag(key='Name', value=name)
484-
except:
485-
print "Failed attempt %i of 5 to tag %s" % ((i + 1), name)
486-
if (i == 5):
487-
raise "Error - failed max attempts to add name tag"
488-
time.sleep(5)
471+
tag_instance(slave, name)
489472

490473
# Return all the instances
491474
return (master_nodes, slave_nodes)
492475

476+
def tag_instance(instance, name):
477+
for i in range(0, 5):
478+
try:
479+
instance.add_tag(key='Name', value=name)
480+
except:
481+
print "Failed attempt %i of 5 to tag %s" % ((i + 1), name)
482+
if (i == 5):
483+
raise "Error - failed max attempts to add name tag"
484+
time.sleep(5)
493485

494486
# Get the EC2 instances in an existing cluster if available.
495487
# Returns a tuple of lists of EC2 instance objects for the masters and slaves
496488
def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
497489
print "Searching for existing cluster " + cluster_name + "..."
490+
# Search all the spot instance requests, and copy any tags from the spot instance request to the cluster.
491+
spot_instance_requests = conn.get_all_spot_instance_requests()
492+
for req in spot_instance_requests:
493+
if req.state != u'active':
494+
continue
495+
name = req.tags.get(u'Name', "")
496+
if name.startswith(cluster_name):
497+
reservations = conn.get_all_instances(instance_ids=[req.instance_id])
498+
for res in reservations:
499+
active = [i for i in res.instances if is_active(i)]
500+
for instance in active:
501+
if (instance.tags.get(u'Name') == None):
502+
tag_instance(instance, name)
503+
# Now proceed to detect master and slaves instances.
498504
reservations = conn.get_all_instances()
499505
master_nodes = []
500506
slave_nodes = []
@@ -517,7 +523,6 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
517523
print >> sys.stderr, "ERROR: Could not find any existing cluster"
518524
sys.exit(1)
519525

520-
521526
# Deploy configuration files and run setup scripts on a newly launched
522527
# or started EC2 cluster.
523528
def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):

0 commit comments

Comments
 (0)