Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 36 additions & 12 deletions ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import time
import urllib2
import warnings
from datetime import datetime
from optparse import OptionParser
from sys import stderr
import boto
Expand Down Expand Up @@ -624,7 +625,9 @@ def setup_spark_cluster(master, opts):


def is_ssh_available(host, opts):
"Checks if SSH is available on the host."
"""
Check if SSH is available on a host.
"""
try:
with open(os.devnull, 'w') as devnull:
ret = subprocess.check_call(
Expand All @@ -639,36 +642,48 @@ def is_ssh_available(host, opts):


def is_cluster_ssh_available(cluster_instances, opts):
"""
Check if SSH is available on all the instances in a cluster.
"""
for i in cluster_instances:
if not is_ssh_available(host=i.ip_address, opts=opts):
return False
else:
return True


def wait_for_cluster_state(cluster_instances, cluster_state, opts):
def wait_for_cluster_state(conn, opts, cluster_instances, cluster_state):
"""
Wait for all the instances in the cluster to reach a designated state.

cluster_instances: a list of boto.ec2.instance.Instance
cluster_state: a string representing the desired state of all the instances in the cluster
value can be 'ssh-ready' or a valid value from boto.ec2.instance.InstanceState such as
'running', 'terminated', etc.
(would be nice to replace this with a proper enum: http://stackoverflow.com/a/1695250)
"""
sys.stdout.write(
"Waiting for all instances in cluster to enter '{s}' state.".format(s=cluster_state)
"Waiting for cluster to enter '{s}' state.".format(s=cluster_state)
)
sys.stdout.flush()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is capturing the output not required anymore ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, it never was.

When I originally added this in, I was testing in the Python shell and got confused. i.update() echoes its return value to the shell, so I thought I needed to suppress that output. But as part of a program, nothing gets echoed. We don't need the return value, so there's no need to capture it.

It's like just typing var in the shell and seeing its value. As part of a program, var alone will do nothing.


start_time = datetime.now()

num_attempts = 0
conn = ec2.connect_to_region(opts.region)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We seem to be passing around conn across functions in the existing script. Any reason to not do that and create a new one here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I'll fix that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.


while True:
time.sleep(3 * num_attempts)
time.sleep(5 * num_attempts) # seconds

for i in cluster_instances:
s = i.update() # capture output to suppress print to screen in newer versions of boto
i.update()

statuses = conn.get_all_instance_status(instance_ids=[i.id for i in cluster_instances])

if cluster_state == 'ssh-ready':
if all(i.state == 'running' for i in cluster_instances) and \
all(s.system_status.status == 'ok' for s in statuses) and \
all(s.instance_status.status == 'ok' for s in statuses) and \
is_cluster_ssh_available(cluster_instances, opts):
break
else:
Expand All @@ -682,6 +697,12 @@ def wait_for_cluster_state(cluster_instances, cluster_state, opts):

sys.stdout.write("\n")

end_time = datetime.now()
print "Cluster is now in '{s}' state. Waited {t} seconds.".format(
s=cluster_state,
t=(end_time - start_time).seconds
)


# Get number of local disks available for a given EC2 instance type.
def get_num_disks(instance_type):
Expand Down Expand Up @@ -930,7 +951,7 @@ def real_main():
# See: https://docs.python.org/3.5/whatsnew/2.7.html
warnings.warn(
"This option is deprecated and has no effect. "
"spark-ec2 automatically waits as long as necessary for clusters to startup.",
"spark-ec2 automatically waits as long as necessary for clusters to start up.",
DeprecationWarning
)

Expand All @@ -957,9 +978,10 @@ def real_main():
else:
(master_nodes, slave_nodes) = launch_cluster(conn, opts, cluster_name)
wait_for_cluster_state(
conn=conn,
opts=opts,
cluster_instances=(master_nodes + slave_nodes),
cluster_state='ssh-ready',
opts=opts
cluster_state='ssh-ready'
)
setup_cluster(conn, master_nodes, slave_nodes, opts, True)

Expand Down Expand Up @@ -990,9 +1012,10 @@ def real_main():
group_names = [opts.security_group_prefix + "-master",
opts.security_group_prefix + "-slaves"]
wait_for_cluster_state(
conn=conn,
opts=opts,
cluster_instances=(master_nodes + slave_nodes),
cluster_state='terminated',
opts=opts
cluster_state='terminated'
)
attempt = 1
while attempt <= 3:
Expand Down Expand Up @@ -1094,9 +1117,10 @@ def real_main():
if inst.state not in ["shutting-down", "terminated"]:
inst.start()
wait_for_cluster_state(
conn=conn,
opts=opts,
cluster_instances=(master_nodes + slave_nodes),
cluster_state='ssh-ready',
opts=opts
cluster_state='ssh-ready'
)
setup_cluster(conn, master_nodes, slave_nodes, opts, False)

Expand Down