Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 25 additions & 20 deletions ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
# A URL prefix from which to fetch AMI information
AMI_PREFIX = "https://raw.github.com/mesos/spark-ec2/v2/ami-list"


class UsageError(Exception):
pass

Expand Down Expand Up @@ -463,38 +462,45 @@ def launch_cluster(conn, opts, cluster_name):
print "Launched master in %s, regid = %s" % (zone, master_res.id)

# Give the instances descriptive names
# TODO: Add retry logic for tagging with name since it's used to identify a cluster.
for master in master_nodes:
name = '{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id)
for i in range(0, 5):
try:
master.add_tag(key='Name', value=name)
except:
print "Failed attempt %i of 5 to tag %s" % ((i + 1), name)
if (i == 5):
raise "Error - failed max attempts to add name tag"
time.sleep(5)

tag_instance(master, name)

for slave in slave_nodes:
name = '{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id)
for i in range(0, 5):
try:
slave.add_tag(key='Name', value=name)
except:
print "Failed attempt %i of 5 to tag %s" % ((i + 1), name)
if (i == 5):
raise "Error - failed max attempts to add name tag"
time.sleep(5)
tag_instance(slave, name)

# Return all the instances
return (master_nodes, slave_nodes)

def tag_instance(instance, name):
for i in range(0, 5):
try:
instance.add_tag(key='Name', value=name)
except:
print "Failed attempt %i of 5 to tag %s" % ((i + 1), name)
if (i == 5):
raise "Error - failed max attempts to add name tag"
time.sleep(5)

# Get the EC2 instances in an existing cluster if available.
# Returns a tuple of lists of EC2 instance objects for the masters and slaves
def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
print "Searching for existing cluster " + cluster_name + "..."
# Search all the spot instance requests, and copy any tags from the spot instance request to the cluster.
spot_instance_requests = conn.get_all_spot_instance_requests()
for req in spot_instance_requests:
if req.state != u'active':
continue
name = req.tags.get(u'Name', "")
if name.startswith(cluster_name):
reservations = conn.get_all_instances(instance_ids=[req.instance_id])
for res in reservations:
active = [i for i in res.instances if is_active(i)]
for instance in active:
if (instance.tags.get(u'Name') == None):
tag_instance(instance, name)
# Now proceed to detect master and slaves instances.
reservations = conn.get_all_instances()
master_nodes = []
slave_nodes = []
Expand All @@ -517,7 +523,6 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
print >> sys.stderr, "ERROR: Could not find any existing cluster"
sys.exit(1)


# Deploy configuration files and run setup scripts on a newly launched
# or started EC2 cluster.
def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
Expand Down