Skip to content

Commit 126e4cf

Browse files
committed
wait for specific cluster states
1 parent 0d1cc4a commit 126e4cf

File tree

1 file changed

+73
-6
lines changed

1 file changed

+73
-6
lines changed

ec2/spark_ec2.py

Lines changed: 73 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,11 @@ def parse_args():
6060
parser.add_option(
6161
"-s", "--slaves", type="int", default=1,
6262
help="Number of slaves to launch (default: %default)")
63-
parser.add_option(
64-
"-w", "--wait", type="int", default=120,
65-
help="Seconds to wait for nodes to start (default: %default)")
63+
# NOTE: For strict "API" compatibility, we should probably leave this in
64+
# and just mark it as deprecated / not used.
65+
# parser.add_option(
66+
# "-w", "--wait", type="int", default=120,
67+
# help="Seconds to wait for nodes to start (default: %default)")
6668
parser.add_option(
6769
"-k", "--key-pair",
6870
help="Key pair to use on instances")
@@ -618,6 +620,55 @@ def wait_for_cluster(conn, wait_secs, master_nodes, slave_nodes):
618620
time.sleep(wait_secs)
619621

620622

623+
def is_ssh_available(host, opts):
624+
"Checks if SSH is available on the host."
625+
try:
626+
with open(os.devnull, 'w') as devnull:
627+
ret = subprocess.check_call(
628+
ssh_command(opts) +
629+
['-t', '-t',
630+
'-o', 'ConnectTimeout=3',
631+
'%s@%s' % (opts.user, host),
632+
stringify_command('true')],
633+
stdout=devnull,
634+
stderr=devnull
635+
)
636+
if ret == 0:
637+
return True
638+
else:
639+
return False
640+
except subprocess.CalledProcessError as e:
641+
return False
642+
643+
644+
def wait_for_cluster_state(cluster_instances, cluster_state, opts):
645+
"""
646+
cluster_instances: a list of boto.ec2.instance.Instance
647+
cluster_state: a string representing the desired state of all the instances in the cluster
648+
value can be 'ssh-ready' or a valid value from boto.ec2.instance.InstanceState such as
649+
'running', 'terminated', etc.
650+
(would be nice to replace this with a proper enum: http://stackoverflow.com/a/1695250)
651+
"""
652+
sys.stdout.write("Waiting for all instances in cluster to enter '{s}' state.".format(s=cluster_state))
653+
sys.stdout.flush()
654+
while True:
655+
for i in cluster_instances:
656+
s = i.update() # capture output to suppress print to screen in newer versions of boto
657+
# print "{instance}: {state}".format(instance=i.id, state=i.state)
658+
if cluster_state == 'ssh-ready':
659+
if all(i.state == 'running' for i in cluster_instances) and \
660+
all(is_ssh_available(host=i.ip_address, opts=opts) for i in cluster_instances):
661+
print "" # so that next line of output starts on new line
662+
return
663+
else:
664+
if all(i.state == cluster_state for i in cluster_instances):
665+
print "" # so that next line of output starts on new line
666+
return
667+
sys.stdout.write(".")
668+
sys.stdout.flush()
669+
time.sleep(3)
670+
671+
621672
# Get number of local disks available for a given EC2 instance type.
622673
def get_num_disks(instance_type):
623674
# From http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/InstanceStorage.html
@@ -872,7 +923,14 @@ def real_main():
872923
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
873924
else:
874925
(master_nodes, slave_nodes) = launch_cluster(conn, opts, cluster_name)
875-
wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes)
926+
# wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes)
927+
# NOTE: This next line means if we have a terminally broken cluster, (e.gl for a resume)
928+
# we'll keep waiting until the user exits.
929+
wait_for_cluster_state(
930+
cluster_instances=(master_nodes + slave_nodes),
931+
cluster_state='ssh-ready',
932+
opts=opts
933+
)
876934
setup_cluster(conn, master_nodes, slave_nodes, opts, True)
877935

878936
elif action == "destroy":
@@ -901,7 +959,11 @@ def real_main():
901959
else:
902960
group_names = [opts.security_group_prefix + "-master",
903961
opts.security_group_prefix + "-slaves"]
904-
962+
wait_for_cluster_state(
963+
cluster_instances=(master_nodes + slave_nodes),
964+
cluster_state='terminated',
965+
opts=opts
966+
)
905967
attempt = 1
906968
while attempt <= 3:
907969
print "Attempt %d" % attempt
@@ -987,7 +1049,12 @@ def real_main():
9871049
for inst in master_nodes:
9881050
if inst.state not in ["shutting-down", "terminated"]:
9891051
inst.start()
990-
wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes)
1052+
# wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes)
1053+
wait_for_cluster_state(
1054+
cluster_instances=(master_nodes + slave_nodes),
1055+
cluster_state='ssh-ready',
1056+
opts=opts
1057+
)
9911058
setup_cluster(conn, master_nodes, slave_nodes, opts, False)
9921059

9931060
else:

0 commit comments

Comments
 (0)