Skip to content

Commit 5912ca6

Browse files
nchammasJoshRosen
authored andcommitted
[SPARK-3398] [EC2] Have spark-ec2 intelligently wait for specific cluster states
Instead of waiting arbitrary amounts of time for the cluster to reach a specific state, this patch lets `spark-ec2` explicitly wait for a cluster to reach a desired state. This is useful in a couple of situations: * The cluster is launching and you want to wait until SSH is available before installing stuff. * The cluster is being terminated and you want to wait until all the instances are terminated before trying to delete security groups. This patch removes the need for the `--wait` option and removes some of the time-based retry logic that was being used. Author: Nicholas Chammas <[email protected]> Closes apache#2339 from nchammas/spark-ec2-wait-properly and squashes the following commits: 43a69f0 [Nicholas Chammas] short-circuit SSH check; linear backoff 9a9e035 [Nicholas Chammas] remove extraneous comment 26c5ed0 [Nicholas Chammas] replace print with write() bb67c06 [Nicholas Chammas] deprecate wait option; remove dead code 7969265 [Nicholas Chammas] fix long line (PEP 8) 126e4cf [Nicholas Chammas] wait for specific cluster states
1 parent b32bb72 commit 5912ca6

File tree

1 file changed

+86
-25
lines changed

1 file changed

+86
-25
lines changed

ec2/spark_ec2.py

Lines changed: 86 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import tempfile
3333
import time
3434
import urllib2
35+
import warnings
3536
from optparse import OptionParser
3637
from sys import stderr
3738
import boto
@@ -61,8 +62,8 @@ def parse_args():
6162
"-s", "--slaves", type="int", default=1,
6263
help="Number of slaves to launch (default: %default)")
6364
parser.add_option(
64-
"-w", "--wait", type="int", default=120,
65-
help="Seconds to wait for nodes to start (default: %default)")
65+
"-w", "--wait", type="int",
66+
help="DEPRECATED (no longer necessary) - Seconds to wait for nodes to start")
6667
parser.add_option(
6768
"-k", "--key-pair",
6869
help="Key pair to use on instances")
@@ -195,18 +196,6 @@ def get_or_make_group(conn, name):
195196
return conn.create_security_group(name, "Spark EC2 group")
196197

197198

198-
# Wait for a set of launched instances to exit the "pending" state
199-
# (i.e. either to start running or to fail and be terminated)
200-
def wait_for_instances(conn, instances):
201-
while True:
202-
for i in instances:
203-
i.update()
204-
if len([i for i in instances if i.state == 'pending']) > 0:
205-
time.sleep(5)
206-
else:
207-
return
208-
209-
210199
# Check whether a given EC2 instance object is in a state we consider active,
211200
# i.e. not terminating or terminated. We count both stopping and stopped as
212201
# active since we can restart stopped clusters.
@@ -619,14 +608,64 @@ def setup_spark_cluster(master, opts):
619608
print "Ganglia started at http://%s:5080/ganglia" % master
620609

621610

622-
# Wait for a whole cluster (masters, slaves and ZooKeeper) to start up
623-
def wait_for_cluster(conn, wait_secs, master_nodes, slave_nodes):
624-
print "Waiting for instances to start up..."
625-
time.sleep(5)
626-
wait_for_instances(conn, master_nodes)
627-
wait_for_instances(conn, slave_nodes)
628-
print "Waiting %d more seconds..." % wait_secs
629-
time.sleep(wait_secs)
611+
def is_ssh_available(host, opts):
612+
"Checks if SSH is available on the host."
613+
try:
614+
with open(os.devnull, 'w') as devnull:
615+
ret = subprocess.check_call(
616+
ssh_command(opts) + ['-t', '-t', '-o', 'ConnectTimeout=3',
617+
'%s@%s' % (opts.user, host), stringify_command('true')],
618+
stdout=devnull,
619+
stderr=devnull
620+
)
621+
return ret == 0
622+
except subprocess.CalledProcessError as e:
623+
return False
624+
625+
626+
def is_cluster_ssh_available(cluster_instances, opts):
627+
for i in cluster_instances:
628+
if not is_ssh_available(host=i.ip_address, opts=opts):
629+
return False
630+
else:
631+
return True
632+
633+
634+
def wait_for_cluster_state(cluster_instances, cluster_state, opts):
635+
"""
636+
cluster_instances: a list of boto.ec2.instance.Instance
637+
cluster_state: a string representing the desired state of all the instances in the cluster
638+
value can be 'ssh-ready' or a valid value from boto.ec2.instance.InstanceState such as
639+
'running', 'terminated', etc.
640+
(would be nice to replace this with a proper enum: http://stackoverflow.com/a/1695250)
641+
"""
642+
sys.stdout.write(
643+
"Waiting for all instances in cluster to enter '{s}' state.".format(s=cluster_state)
644+
)
645+
sys.stdout.flush()
646+
647+
num_attempts = 0
648+
649+
while True:
650+
time.sleep(3 * num_attempts)
651+
652+
for i in cluster_instances:
653+
s = i.update() # capture output to suppress print to screen in newer versions of boto
654+
655+
if cluster_state == 'ssh-ready':
656+
if all(i.state == 'running' for i in cluster_instances) and \
657+
is_cluster_ssh_available(cluster_instances, opts):
658+
break
659+
else:
660+
if all(i.state == cluster_state for i in cluster_instances):
661+
break
662+
663+
num_attempts += 1
664+
665+
sys.stdout.write(".")
666+
sys.stdout.flush()
667+
668+
sys.stdout.write("\n")
630669

631670

632671
# Get number of local disks available for a given EC2 instance type.
@@ -868,6 +907,16 @@ def real_main():
868907
(opts, action, cluster_name) = parse_args()
869908

870909
# Input parameter validation
910+
if opts.wait is not None:
911+
# NOTE: DeprecationWarnings are silent in 2.7+ by default.
912+
# To show them, run Python with the -Wdefault switch.
913+
# See: https://docs.python.org/3.5/whatsnew/2.7.html
914+
warnings.warn(
915+
"This option is deprecated and has no effect. "
916+
"spark-ec2 automatically waits as long as necessary for clusters to startup.",
917+
DeprecationWarning
918+
)
919+
871920
if opts.ebs_vol_num > 8:
872921
print >> stderr, "ebs-vol-num cannot be greater than 8"
873922
sys.exit(1)
@@ -890,7 +939,11 @@ def real_main():
890939
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
891940
else:
892941
(master_nodes, slave_nodes) = launch_cluster(conn, opts, cluster_name)
893-
wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes)
942+
wait_for_cluster_state(
943+
cluster_instances=(master_nodes + slave_nodes),
944+
cluster_state='ssh-ready',
945+
opts=opts
946+
)
894947
setup_cluster(conn, master_nodes, slave_nodes, opts, True)
895948

896949
elif action == "destroy":
@@ -919,7 +972,11 @@ def real_main():
919972
else:
920973
group_names = [opts.security_group_prefix + "-master",
921974
opts.security_group_prefix + "-slaves"]
922-
975+
wait_for_cluster_state(
976+
cluster_instances=(master_nodes + slave_nodes),
977+
cluster_state='terminated',
978+
opts=opts
979+
)
923980
attempt = 1
924981
while attempt <= 3:
925982
print "Attempt %d" % attempt
@@ -1019,7 +1076,11 @@ def real_main():
10191076
for inst in master_nodes:
10201077
if inst.state not in ["shutting-down", "terminated"]:
10211078
inst.start()
1022-
wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes)
1079+
wait_for_cluster_state(
1080+
cluster_instances=(master_nodes + slave_nodes),
1081+
cluster_state='ssh-ready',
1082+
opts=opts
1083+
)
10231084
setup_cluster(conn, master_nodes, slave_nodes, opts, False)
10241085

10251086
else:

0 commit comments

Comments
 (0)