diff --git a/plugins/module_utils/cm_utils.py b/plugins/module_utils/cm_utils.py index c36679a9..52e4cf5c 100644 --- a/plugins/module_utils/cm_utils.py +++ b/plugins/module_utils/cm_utils.py @@ -417,7 +417,7 @@ def _add_log(err): return _impl - def __init__(self, module): + def __init__(self, module: AnsibleModule): # Set common parameters self.module = module self.url = self.get_param("url", None) diff --git a/plugins/module_utils/parcel_utils.py b/plugins/module_utils/parcel_utils.py index 090376dc..4f962e5c 100644 --- a/plugins/module_utils/parcel_utils.py +++ b/plugins/module_utils/parcel_utils.py @@ -18,6 +18,7 @@ import time +from collections.abc import Callable from enum import IntEnum from cm_client import ( @@ -51,6 +52,7 @@ def __init__( product: str, version: str, cluster: str, + log: Callable[[str, dict], None], delay: int = 15, timeout: int = 600, ) -> None: @@ -60,6 +62,7 @@ def __init__( self.cluster = cluster self.delay = delay self.timeout = timeout + self.log = log self.current = Parcel.STAGE[ str( @@ -85,16 +88,21 @@ def _wait(self, stage: STAGE) -> None: if parcel_status.stage == stage.name: return else: + self.log( + f"[RETRY] Waiting for parcel stage, {stage.name}, for cluster '{self.cluster}': Product {self.product}[{self.version}]" + ) time.sleep(self.delay) - return Exception(f"Failed to reach {stage.name}: timeout ({self.timeout} secs)") + return Exception( + f"Failed to reach parcel stage, {stage.name}: timeout ({self.timeout} secs)" + ) def _exec(self, stage: STAGE, func) -> None: - retries = 0 + end_time = time.time() + self.timeout # Retry the function, i.e. start_distribution, if receiving a 400 error due to # potential "eventual consistency" issues with parcel torrents. - while True: + while end_time > time.time(): try: func( cluster_name=self.cluster, @@ -103,9 +111,11 @@ def _exec(self, stage: STAGE, func) -> None: ) break except ApiException as e: - if retries < 4 and e.status == 400: - retries += 1 - time.sleep(15) + if e.status == 400: + self.log( + f"[RETRY] Attempting to execute parcel function, {func}, for cluster '{self.cluster}': Product {self.product}[{self.version}]" + ) + time.sleep(self.delay) continue else: raise e diff --git a/plugins/modules/cluster.py b/plugins/modules/cluster.py index 34078b51..665ab4f5 100644 --- a/plugins/modules/cluster.py +++ b/plugins/modules/cluster.py @@ -1296,6 +1296,7 @@ def create_cluster_from_parameters(self): product=p, version=v, cluster=self.name, + log=self.module.log, delay=self.delay, timeout=self.timeout, ) diff --git a/plugins/modules/host.py b/plugins/modules/host.py index 23218efc..8ac17d56 100644 --- a/plugins/modules/host.py +++ b/plugins/modules/host.py @@ -160,6 +160,20 @@ - started - stopped - restarted + timeout: + description: + - Timeout, in seconds, before failing when joining a cluster. + type: int + default: 300 + aliases: + - polling_timeout + delay: + description: + - Delay (interval), in seconds, between each attempt. + type: int + default: 15 + aliases: + - polling_interval extends_documentation_fragment: - ansible.builtin.action_common_attributes - cloudera.cluster.cm_options @@ -339,7 +353,7 @@ returned: when supported """ -from time import sleep +import time from cm_client import ( ApiHost, @@ -349,8 +363,6 @@ ClustersResourceApi, HostsResourceApi, HostTemplatesResourceApi, - ParcelResourceApi, - ParcelsResourceApi, RolesResourceApi, ) from cm_client.rest import ApiException @@ -376,9 +388,6 @@ HostMaintenanceStateException, HostException, ) -from ansible_collections.cloudera.cluster.plugins.module_utils.parcel_utils import ( - Parcel, -) from ansible_collections.cloudera.cluster.plugins.module_utils.role_utils import ( parse_role_result, ) @@ -403,6 +412,8 @@ def __init__(self, module): self.skip_redacted = self.get_param("skip_redacted") self.maintenance = self.get_param("maintenance") self.state = self.get_param("state") + self.timeout = self.get_param("timeout") + self.delay = self.get_param("delay") # Initialize the return values self.output = {} @@ -418,8 +429,6 @@ def process(self): cluster_api = ClustersResourceApi(self.api_client) host_api = HostsResourceApi(self.api_client) host_template_api = HostTemplatesResourceApi(self.api_client) - parcels_api = ParcelsResourceApi(self.api_client) - parcel_api = ParcelResourceApi(self.api_client) role_api = RolesResourceApi(self.api_client) current = None @@ -627,10 +636,10 @@ def process(self): self.diff["after"].update(cluster=cluster.name) if not self.module.check_mode: - # Add the host to the cluster (with simple retry) - add_retry = 0 + # Add the host to the cluster + end_time = time.time() + self.timeout - while True: + while end_time > time.time(): try: cluster_api.add_hosts( cluster_name=cluster.name, @@ -645,30 +654,15 @@ def process(self): ) break except ApiException as ae: - if add_retry < 4 and ae.status == 400: - add_retry += 1 - sleep(10) + if ae.status == 400: + self.module.log( + f"[RETRY] Attempting to add host, {current.hostname}, to cluster, {cluster.name}" + ) + time.sleep(self.delay) continue else: raise ae - # parcel_api = ParcelResourceApi(self.api_client) - # try: - # for parcel in parcels_api.read_parcels( - # cluster_name=cluster.name - # ).items: - # if parcel.stage in ["DOWNLOADED", "DISTRIBUTED"]: - # Parcel( - # parcel_api=parcel_api, - # product=parcel.product, - # version=parcel.version, - # cluster=cluster.name, - # ).activate() - # except ApiException as ae: - # self.module.fail_json( - # msg="Error managing parcel states: " + to_native(ae) - # ) - # Handle cluster migration elif current.cluster_ref.cluster_name != cluster.name: self.changed = True @@ -875,6 +869,8 @@ def main(): "restarted", ], ), + timeout=dict(type="int", default=300, aliases=["polling_timeout"]), + delay=dict(type="int", default=15, aliases=["polling_interval"]), ), required_one_of=[ ("name", "host_id"), diff --git a/plugins/modules/parcel.py b/plugins/modules/parcel.py index 155eafe1..2052e13a 100644 --- a/plugins/modules/parcel.py +++ b/plugins/modules/parcel.py @@ -75,6 +75,20 @@ - 'present' - 'absent' required: False + timeout: + description: + - Timeout, in seconds, before failing when changing state, e.g. V(DISTRIBUTED). + type: int + default: 1200 + aliases: + - polling_timeout + delay: + description: + - Delay (interval), in seconds, between each attempt. + type: int + default: 15 + aliases: + - polling_interval extends_documentation_fragment: - ansible.builtin.action_common_attributes - cloudera.cluster.cm_options @@ -213,6 +227,7 @@ def process(self): product=self.parcel_name, version=self.parcel_version, cluster=self.cluster, + log=self.module.log, delay=self.delay, timeout=self.timeout, ) @@ -256,10 +271,10 @@ def main(): name=dict(required=True, aliases=["parcel", "product"]), parcel_version=dict(required=True), delay=dict( - required=False, type="int", default=10, aliases=["polling_interval"] + required=False, type="int", default=15, aliases=["polling_interval"] ), timeout=dict( - required=False, type="int", default=600, aliases=["polling_timeout"] + required=False, type="int", default=1200, aliases=["polling_timeout"] ), state=dict( default="present", diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 0ede0dd5..2e0039b5 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -19,6 +19,7 @@ __metaclass__ = type import json +import logging import os import pytest import random @@ -97,6 +98,8 @@ set_role_config_group, ) +LOG = logging.getLogger(__name__) + class NoHostsFoundException(Exception): pass @@ -384,11 +387,15 @@ def base_cluster(cm_api_client, cms_session) -> Generator[ApiCluster]: f"CDH Version {cdh_version} not found. Please check your parcel repo configuration." ) + def _log(msg: str, args: dict = None) -> None: + LOG.log(logging.INFO, msg) + parcel = Parcel( parcel_api=parcel_api, product=cdh_parcel.product, version=cdh_parcel.version, cluster=name, + log=_log, ) parcel.activate()