Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion plugins/module_utils/cm_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ def _add_log(err):

return _impl

def __init__(self, module):
def __init__(self, module: AnsibleModule):
# Set common parameters
self.module = module
self.url = self.get_param("url", None)
Expand Down
22 changes: 16 additions & 6 deletions plugins/module_utils/parcel_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import time

from collections.abc import Callable
from enum import IntEnum

from cm_client import (
Expand Down Expand Up @@ -51,6 +52,7 @@ def __init__(
product: str,
version: str,
cluster: str,
log: Callable[[str, dict], None],
delay: int = 15,
timeout: int = 600,
) -> None:
Expand All @@ -60,6 +62,7 @@ def __init__(
self.cluster = cluster
self.delay = delay
self.timeout = timeout
self.log = log

self.current = Parcel.STAGE[
str(
Expand All @@ -85,16 +88,21 @@ def _wait(self, stage: STAGE) -> None:
if parcel_status.stage == stage.name:
return
else:
self.log(
f"[RETRY] Waiting for parcel stage, {stage.name}, for cluster '{self.cluster}': Product {self.product}[{self.version}]"
)
time.sleep(self.delay)

return Exception(f"Failed to reach {stage.name}: timeout ({self.timeout} secs)")
return Exception(
f"Failed to reach parcel stage, {stage.name}: timeout ({self.timeout} secs)"
)

def _exec(self, stage: STAGE, func) -> None:
retries = 0
end_time = time.time() + self.timeout

# Retry the function, i.e. start_distribution, if receiving a 400 error due to
# potential "eventual consistency" issues with parcel torrents.
while True:
while end_time > time.time():
try:
func(
cluster_name=self.cluster,
Expand All @@ -103,9 +111,11 @@ def _exec(self, stage: STAGE, func) -> None:
)
break
except ApiException as e:
if retries < 4 and e.status == 400:
retries += 1
time.sleep(15)
if e.status == 400:
self.log(
f"[RETRY] Attempting to execute parcel function, {func}, for cluster '{self.cluster}': Product {self.product}[{self.version}]"
)
time.sleep(self.delay)
continue
else:
raise e
Expand Down
1 change: 1 addition & 0 deletions plugins/modules/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1296,6 +1296,7 @@ def create_cluster_from_parameters(self):
product=p,
version=v,
cluster=self.name,
log=self.module.log,
delay=self.delay,
timeout=self.timeout,
)
Expand Down
58 changes: 27 additions & 31 deletions plugins/modules/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,20 @@
- started
- stopped
- restarted
timeout:
description:
- Timeout, in seconds, before failing when joining a cluster.
type: int
default: 300
aliases:
- polling_timeout
delay:
description:
- Delay (interval), in seconds, between each attempt.
type: int
default: 15
aliases:
- polling_interval
extends_documentation_fragment:
- ansible.builtin.action_common_attributes
- cloudera.cluster.cm_options
Expand Down Expand Up @@ -339,7 +353,7 @@
returned: when supported
"""

from time import sleep
import time

from cm_client import (
ApiHost,
Expand All @@ -349,8 +363,6 @@
ClustersResourceApi,
HostsResourceApi,
HostTemplatesResourceApi,
ParcelResourceApi,
ParcelsResourceApi,
RolesResourceApi,
)
from cm_client.rest import ApiException
Expand All @@ -376,9 +388,6 @@
HostMaintenanceStateException,
HostException,
)
from ansible_collections.cloudera.cluster.plugins.module_utils.parcel_utils import (
Parcel,
)
from ansible_collections.cloudera.cluster.plugins.module_utils.role_utils import (
parse_role_result,
)
Expand All @@ -403,6 +412,8 @@ def __init__(self, module):
self.skip_redacted = self.get_param("skip_redacted")
self.maintenance = self.get_param("maintenance")
self.state = self.get_param("state")
self.timeout = self.get_param("timeout")
self.delay = self.get_param("delay")

# Initialize the return values
self.output = {}
Expand All @@ -418,8 +429,6 @@ def process(self):
cluster_api = ClustersResourceApi(self.api_client)
host_api = HostsResourceApi(self.api_client)
host_template_api = HostTemplatesResourceApi(self.api_client)
parcels_api = ParcelsResourceApi(self.api_client)
parcel_api = ParcelResourceApi(self.api_client)
role_api = RolesResourceApi(self.api_client)

current = None
Expand Down Expand Up @@ -627,10 +636,10 @@ def process(self):
self.diff["after"].update(cluster=cluster.name)

if not self.module.check_mode:
# Add the host to the cluster (with simple retry)
add_retry = 0
# Add the host to the cluster
end_time = time.time() + self.timeout

while True:
while end_time > time.time():
try:
cluster_api.add_hosts(
cluster_name=cluster.name,
Expand All @@ -645,30 +654,15 @@ def process(self):
)
break
except ApiException as ae:
if add_retry < 4 and ae.status == 400:
add_retry += 1
sleep(10)
if ae.status == 400:
self.module.log(
f"[RETRY] Attempting to add host, {current.hostname}, to cluster, {cluster.name}"
)
time.sleep(self.delay)
continue
else:
raise ae

# parcel_api = ParcelResourceApi(self.api_client)
# try:
# for parcel in parcels_api.read_parcels(
# cluster_name=cluster.name
# ).items:
# if parcel.stage in ["DOWNLOADED", "DISTRIBUTED"]:
# Parcel(
# parcel_api=parcel_api,
# product=parcel.product,
# version=parcel.version,
# cluster=cluster.name,
# ).activate()
# except ApiException as ae:
# self.module.fail_json(
# msg="Error managing parcel states: " + to_native(ae)
# )

# Handle cluster migration
elif current.cluster_ref.cluster_name != cluster.name:
self.changed = True
Expand Down Expand Up @@ -875,6 +869,8 @@ def main():
"restarted",
],
),
timeout=dict(type="int", default=300, aliases=["polling_timeout"]),
delay=dict(type="int", default=15, aliases=["polling_interval"]),
),
required_one_of=[
("name", "host_id"),
Expand Down
19 changes: 17 additions & 2 deletions plugins/modules/parcel.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,20 @@
- 'present'
- 'absent'
required: False
timeout:
description:
- Timeout, in seconds, before failing when changing state, e.g. V(DISTRIBUTED).
type: int
default: 1200
aliases:
- polling_timeout
delay:
description:
- Delay (interval), in seconds, between each attempt.
type: int
default: 15
aliases:
- polling_interval
extends_documentation_fragment:
- ansible.builtin.action_common_attributes
- cloudera.cluster.cm_options
Expand Down Expand Up @@ -213,6 +227,7 @@ def process(self):
product=self.parcel_name,
version=self.parcel_version,
cluster=self.cluster,
log=self.module.log,
delay=self.delay,
timeout=self.timeout,
)
Expand Down Expand Up @@ -256,10 +271,10 @@ def main():
name=dict(required=True, aliases=["parcel", "product"]),
parcel_version=dict(required=True),
delay=dict(
required=False, type="int", default=10, aliases=["polling_interval"]
required=False, type="int", default=15, aliases=["polling_interval"]
),
timeout=dict(
required=False, type="int", default=600, aliases=["polling_timeout"]
required=False, type="int", default=1200, aliases=["polling_timeout"]
),
state=dict(
default="present",
Expand Down
7 changes: 7 additions & 0 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
__metaclass__ = type

import json
import logging
import os
import pytest
import random
Expand Down Expand Up @@ -97,6 +98,8 @@
set_role_config_group,
)

LOG = logging.getLogger(__name__)


class NoHostsFoundException(Exception):
pass
Expand Down Expand Up @@ -384,11 +387,15 @@ def base_cluster(cm_api_client, cms_session) -> Generator[ApiCluster]:
f"CDH Version {cdh_version} not found. Please check your parcel repo configuration."
)

def _log(msg: str, args: dict = None) -> None:
LOG.log(logging.INFO, msg)

parcel = Parcel(
parcel_api=parcel_api,
product=cdh_parcel.product,
version=cdh_parcel.version,
cluster=name,
log=_log,
)

parcel.activate()
Expand Down
Loading