Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a52126b
Set up two ZooKeeper roles (two SERVER roles) in zk_session
wmudge Mar 26, 2025
4b4aeaa
Add hostname to parse_role_result()
wmudge Mar 26, 2025
e0bf780
Update tests to use fixtures and discovered expected results
wmudge Mar 26, 2025
5ef07a0
Add get_cluster_hosts utility function
wmudge Mar 31, 2025
0446ad6
Add family of wait_* functions for different Command types
wmudge Mar 31, 2025
2ae77ef
Add API docs for get_host and get_host_ref utilities
wmudge Mar 31, 2025
a691506
Add filter to read_roles utility function
wmudge Mar 31, 2025
348e7e5
Update to use wait_commands function
wmudge Mar 31, 2025
8fbed28
Update to use updated read_roles utility function
wmudge Mar 31, 2025
8e99991
Add service_register and service_deregister test utilities for creati…
wmudge Mar 31, 2025
67a55da
Add fixture factories for service, role, and role config group
wmudge Mar 31, 2025
4d0ca79
Update to renamed service creation utility
wmudge Mar 31, 2025
ca1a3e2
Add register/deregister functions for use with factory fixtures
wmudge Mar 31, 2025
24f4c27
Use fixture factory utility functions and rename the fixtures
wmudge Mar 31, 2025
61a8f84
Add get_service_hosts utility
wmudge Mar 31, 2025
456e238
Update tests to use fixture factory utilities
wmudge Mar 31, 2025
559964f
Add display_name to base_cluster fixture
wmudge Apr 1, 2025
73d5ee6
Remove unused import
wmudge Apr 1, 2025
da70eb9
Add autouse to zookeeper service fixture
wmudge Apr 1, 2025
d7e6380
Refactor service_role to align with updated logic, utility functions,…
wmudge Apr 1, 2025
68eab86
Add try-except for non-existent role
wmudge Apr 2, 2025
9aa11e7
Remove erroneous imports
wmudge Apr 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion plugins/module_utils/cluster_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
normalize_output,
)

from cm_client import ApiCluster
from cm_client import (
ApiClient,
ApiCluster,
ApiHost,
ClustersResourceApi,
)


CLUSTER_OUTPUT = [
Expand All @@ -44,3 +49,13 @@ def parse_cluster_result(cluster: ApiCluster) -> dict:
output = dict(version=cluster.full_version)
output.update(normalize_output(cluster.to_dict(), CLUSTER_OUTPUT))
return output


def get_cluster_hosts(api_client: ApiClient, cluster: ApiCluster) -> list[ApiHost]:
return (
ClustersResourceApi(api_client)
.list_hosts(
cluster_name=cluster.name,
)
.items
)
96 changes: 92 additions & 4 deletions plugins/module_utils/cm_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@
from ansible.module_utils.common.text.converters import to_native, to_text
from time import sleep
from cm_client import (
ApiBulkCommandList,
ApiClient,
ApiCommand,
ApiCommandList,
ApiConfig,
ApiConfigList,
ApiEntityTag,
Configuration,
)
from cm_client.rest import ApiException, RESTClientObject
Expand All @@ -47,6 +50,43 @@
__maintainer__ = ["[email protected]"]


def wait_bulk_commands(
api_client: ApiClient,
commands: ApiBulkCommandList,
polling: int = 120,
delay: int = 10,
):
if commands.errors:
error_msg = "\n".join(commands.errors)
raise Exception(error_msg)

for cmd in commands.items:
# Serial monitoring
wait_command(api_client, cmd, polling, delay)


def wait_commands(
api_client: ApiClient, commands: ApiCommandList, polling: int = 120, delay: int = 10
):
for cmd in commands.items:
# Serial monitoring
wait_command(api_client, cmd, polling, delay)


def wait_command(
api_client: ApiClient, command: ApiCommand, polling: int = 120, delay: int = 10
):
poll_count = 0
while command.active:
if poll_count > polling:
raise Exception("Command timeout: " + str(command.id))
sleep(delay)
poll_count += 1
command = CommandsResourceApi(api_client).read_command(command.id)
if not command.success:
raise Exception(command.result_message)


def normalize_output(entity: dict, filter: list) -> dict:
output = {}
for k in filter:
Expand Down Expand Up @@ -131,8 +171,8 @@ def resolve_tag_updates(
incoming_tags = {
k: str(v)
for k, v in incoming.items()
if (isinstance(v, str) and v.strip() != "")
or (not isinstance(v, str) and v is not None)
if (type(v) is str and v.strip() != "")
or (type(v) is not str and v is not None)
}

delta_add = {}
Expand All @@ -151,6 +191,29 @@ def resolve_tag_updates(
return (delta_add, delta_del)


class TagUpdates(object):
def __init__(
self, existing: list[ApiEntityTag], updates: dict, purge: bool
) -> None:
(_additions, _deletions) = resolve_tag_updates(
current={t.name: t.value for t in existing},
incoming=updates,
purge=purge,
)

self.diff = dict(
before=_deletions,
after=_additions,
)

self.additions = [ApiEntityTag(k, v) for k, v in _additions.items()]
self.deletions = [ApiEntityTag(k, v) for k, v in _deletions.items()]

@property
def changed(self) -> bool:
return bool(self.additions) or bool(self.deletions)


class ConfigListUpdates(object):
def __init__(self, existing: ApiConfigList, updates: dict, purge: bool) -> None:
current = {r.name: r.value for r in existing.items}
Expand Down Expand Up @@ -507,15 +570,15 @@ def wait_command(self, command: ApiCommand, polling: int = 10, delay: int = 5):
If the command exceeds the polling limit, it fails with a timeout error.
If the command completes unsuccessfully, it fails with the command's result message.

Inputs:
Args:
command (ApiCommand): The command object to monitor.
polling (int, optional): The maximum number of polling attempts before timing out. Default is 10.
delay (int, optional): The time (in seconds) to wait between polling attempts. Default is 5.

Raises:
module.fail_json: If the command times out or fails.

Return:
Returns:
None: The function returns successfully if the command completes and is marked as successful.
"""
poll_count = 0
Expand All @@ -530,6 +593,31 @@ def wait_command(self, command: ApiCommand, polling: int = 10, delay: int = 5):
msg=to_text(command.result_message), command_id=to_text(command.id)
)

def wait_commands(
self, commands: ApiBulkCommandList, polling: int = 10, delay: int = 5
):
"""
Waits for a list of commands to complete, polling each status at regular intervals.
If a command exceeds the polling limit, it fails with a timeout error.
If a command completes unsuccessfully, it fails with the command's result message.

Args:
commands (ApiBulkCommandList): Cloudera Manager bulk commands

Raises:
module.fail_json: If the command times out or fails.

Returns:
none: The function returns successfully if the commands complete and are marked as successful.
"""
if commands.errors:
error_msg = "\n".join(commands.errors)
self.module.fail_json(msg=error_msg)

for c in commands.items:
# Not in parallel...
self.wait_command(c, polling, delay)

@staticmethod
def ansible_module_internal(argument_spec={}, required_together=[], **kwargs):
"""
Expand Down
27 changes: 27 additions & 0 deletions plugins/module_utils/host_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,19 @@
def get_host(
api_client: ApiClient, hostname: str = None, host_id: str = None
) -> ApiHost:
"""Retrieve a Host by either hostname or host ID.

Args:
api_client (ApiClient): Cloudera Manager API client.
hostname (str, optional): The cluster hostname. Defaults to None.
host_id (str, optional): The cluster host ID. Defaults to None.

Raises:
ex: ApiException for all non-404 errors.

Returns:
ApiHost: Host object. If not found, returns None.
"""
if hostname:
return next(
(
Expand All @@ -50,7 +63,21 @@ def get_host(
def get_host_ref(
api_client: ApiClient, hostname: str = None, host_id: str = None
) -> ApiHostRef:
"""Retrieve a Host Reference by either hostname or host ID.

Args:
api_client (ApiClient): Cloudera Manager API client.
hostname (str, optional): The cluster hostname. Defaults to None.
host_id (str, optional): The cluster host ID. Defaults to None.

Raises:
ex: ApiException for all non-404 errors.

Returns:
ApiHostRef: Host reference object. If not found, returns None.
"""
host = get_host(api_client, hostname, host_id)

if host is not None:
return ApiHostRef(host.host_id, host.hostname)
else:
Expand Down
Loading
Loading