From 827a37b7301a2cadab29a92c21b0bb2a74fd9dd6 Mon Sep 17 00:00:00 2001 From: Webster Mudge Date: Thu, 16 Nov 2023 20:54:21 -0500 Subject: [PATCH 1/7] Add module and action for cloudera.cluster.assemble_cluster_template Signed-off-by: Webster Mudge --- plugins/action/assemble_cluster_template.py | 179 +++++++++++++++ plugins/modules/assemble_cluster_template.py | 224 +++++++++++++++++++ 2 files changed, 403 insertions(+) create mode 100644 plugins/action/assemble_cluster_template.py create mode 100644 plugins/modules/assemble_cluster_template.py diff --git a/plugins/action/assemble_cluster_template.py b/plugins/action/assemble_cluster_template.py new file mode 100644 index 00000000..b2ac5781 --- /dev/null +++ b/plugins/action/assemble_cluster_template.py @@ -0,0 +1,179 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright 2023 Cloudera, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import re +import tempfile + +from ansible import constants as C +from ansible.errors import ( + AnsibleAction, + AnsibleError, + _AnsibleActionDone, + AnsibleActionFail, +) +from ansible.module_utils.common.text.converters import to_native, to_text +from ansible.module_utils.parsing.convert_bool import boolean +from ansible.plugins.action import ActionBase +from ansible.utils.hashing import checksum_s + + +class ActionModule(ActionBase): + TRANSFERS_FILES = True + + def process_fragment(self, fh) -> bytes: + updated = bytearray(fh.read()) + updated.extend("\n-------\n".encode()) + return updated + + def complete_assembly(self, assembled_file): + pass + + def _assemble_fragments( + self, assembled_file, src_path, regex=None, ignore_hidden=True + ): + # By file name sort order + for f in ( + to_text(p, errors="surrogate_or_strict") + for p in sorted(os.listdir(src_path)) + ): + # Filter by regexp + if regex and not regex.search(f): + continue + + # Read and process the fragment + fragment = os.path.join(src_path, f) + if not os.path.isfile(fragment) or ( + ignore_hidden and os.path.basename(fragment).startswith(".") + ): + continue + + with open(self._loader.get_real_file(fragment), "rb") as fragment_file: + content = self.process_fragment(fragment_file) + + # Write the resulting bytes + if content is not None: + assembled_file.write(content) + + # Finalize any remaining assembly + self.complete_assembly(assembled_file) + + # Close the assembled file handle + assembled_file.close() + + def run(self, tmp=None, task_vars=None): + self._supports_check_mode = False + + result = super(ActionModule, self).run(tmp, task_vars) + + del tmp # legacy + if task_vars is None: + task_vars = dict() + + src = self._task.args.get("src", None) + dest = self._task.args.get("dest", None) + remote_src = self._task.args.get("remote_src", False) + regexp = self._task.args.get("regexp", None) + follow = self._task.args.get("follow", False) + ignore_hidden = self._task.args.get("ignore_hidden", True) + + try: + if src is None or dest is None: + raise AnsibleActionFail("src and dest are required") + + if boolean(remote_src, strict=False): + result.update( + self._execute_module( + module_name="cloudera.cluster.assemble_cluster_template", + task_vars=task_vars, + ) + ) + raise _AnsibleActionDone() + else: + try: + src = self._find_needle("files", src) + except AnsibleError as e: + raise AnsibleActionFail(to_native(e)) + + if not os.path.isdir(src): + raise AnsibleActionFail(f"Source, {src}, is not a directory") + + compiled = None + if regexp is not None: + compiled = re.compile(regexp) + + with tempfile.NamedTemporaryFile( + dir=C.DEFAULT_LOCAL_TMP, delete=False + ) as assembled: + self._assemble_fragments( + assembled, src, regex=compiled, ignore_hidden=ignore_hidden + ) + + assembled_checksum = checksum_s(assembled.name) + dest = self._remote_expand_user(dest) + dest_stat = self._execute_remote_stat( + dest, all_vars=task_vars, follow=follow + ) + + new_module_args = self._task.args.copy() + + # Purge cloudera.cluster.assemble_cluster_template-specific options + for o in ["remote_src", "regexp", "ignore_hidden"]: + new_module_args.pop(o, None) + + new_module_args.update(dest=dest) + + diff = {} + if assembled_checksum != dest_stat["checksum"]: + if self._task.diff: + diff = self._get_diff_data(dest, assembled.name, task_vars) + + remote_path = self._connection._shell.join_path( + self._connection._shell.tmpdir, "assembled_cluster_template" + ) + transfered = self._transfer_file(assembled.name, remote_path) + + self._fixup_perms2((self._connection._shell.tmpdir, remote_path)) + + new_module_args.update( + dict( + src=transfered, + ) + ) + + copy = self._execute_module( + module_name="ansible.legacy.copy", + module_args=new_module_args, + task_vars=task_vars, + ) + + if diff: + copy.update(diff=diff) + result.update(copy) + else: + file = self._execute_module( + module_name="ansible.legacy.file", + module_args=new_module_args, + task_vars=task_vars, + ) + result.update(file) + except AnsibleAction as e: + result.update(e.result) + finally: + self._remove_tmp_path(self._connection._shell.tmpdir) + + return result diff --git a/plugins/modules/assemble_cluster_template.py b/plugins/modules/assemble_cluster_template.py new file mode 100644 index 00000000..3df34e2a --- /dev/null +++ b/plugins/modules/assemble_cluster_template.py @@ -0,0 +1,224 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright 2023 Cloudera, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +ANSIBLE_METADATA = { + "metadata_version": "1.1", + "status": ["preview"], + "supported_by": "community", +} + +DOCUMENTATION = r""" +--- +module: assemble_cluster_template +short_description: Merge Cloudera Manager cluster template fragments +description: + - Merge Cloudera Manager cluster template fragment files into a single JSON file. + - The module supports C(check_mode). +author: + - "Webster Mudge (@wmudge)" + - "Ronald Suplina (@rsuplina)" + - "Jim Enright (@jenright)" + - "Andre Araujo (@asdaraujo)" +options: + method: + description: + - HTTP method for the CM API endpoint path. + type: str + required: True + choices: + - DELETE + - POST + - PUT + body: + description: + - HTTP body for the CM API endpoint call. + type: dict +extends_documentation_fragment: + - action_common_attributes + - action_common_attributes.flow + - action_common_attributes.files + - decrypt + - files +""" + +EXAMPLES = r""" +--- +""" + +RETURN = r""" +--- +""" + +import os +import re +import tempfile + +from ansible.module_utils.basic import AnsibleModule +from ansible.module_utils.common.text.converters import to_native + + +class AssembleClusterTemplate(object): + def __init__(self, module): + self.module = module + + # Set parameters + self.src = self.module.params["src"] + self.dest = self.module.params["dest"] + self.backup = self.module.params["backup"] + self.remote_src = self.module.params["remote_src"] + self.regexp = self.module.params["regexp"] + self.ignore_hidden = self.module.params["ignore_hidden"] + + self.unsafe_writes = self.module.params["unsafe_writes"] + self.file_perms = self.module.load_file_common_arguments(self.module.params) + + # Initialize the return values + self.output = {} + self.changed = False + + # Initialize internal values + self.compiled = None + + # Execute the logic + self.process() + + def process_fragment(self, fh) -> bytes: + updated = bytearray(fh.read()) + updated.extend("\n-------\n".encode()) + return updated + + def complete_assembly(self, assembled_file): + pass + + def _assemble_fragments(self, assembled_file): + # By file name sort order + for f in sorted(os.listdir(self.src)): + # Filter by regexp + if self.compiled and not self.compiled.search(f): + continue + + # Read and process the fragment + fragment = os.path.join(self.src, f) + if not os.path.isfile(fragment) or ( + self.ignore_hidden and os.path.basename(fragment).startswith(".") + ): + continue + + with open(fragment, "rb") as fragment_file: + content = self.process_fragment(fragment_file) + + # Write the resulting bytes + if content is not None: + assembled_file.write(content) + + # Finalize any remaining assembly + self.complete_assembly(assembled_file) + + # Close the assembled file handle + assembled_file.close() + + def process(self): + # Check source + if not os.path.exists(self.src): + self.module.fail_json(msg=f"Source, {self.src}, does not exist") + elif not os.path.isdir(self.src): + self.module.fail_json(msg=f"Source, {self.src}, is not a directory") + + # Compile filter expression + if self.regexp is not None: + try: + self.compiled = re.compile(self.regexp) + except re.error as e: + self.module.fail_json( + msg=f"Regular expression, {self.regexp} is invalid: {to_native(e)}" + ) + + # Assemble fragments + with tempfile.NamedTemporaryFile( + dir=self.module.tmpdir, delete=False + ) as assembled: + # Process fragments into temporary file + self._assemble_fragments(assembled) + + # Confirm the assembled file is closed + if not assembled.closed: + self.module.fail_json( + msg=f"Assembled file, {assembled.name}, not closed after fragment processing" + ) + + # Generate hashes for assembled file + assembled_sha1 = self.module.sha1(assembled.name) + self.output.update(checksum=assembled_sha1) + + try: + md5 = self.module.md5(assembled.name) + except ValueError: + md5 = None + self.output.update(md5sum=md5) + + # Move to destination + dest_sha1 = None + if os.path.exists(self.dest): + dest_sha1 = self.module.sha1(self.dest) + + if assembled_sha1 != dest_sha1: + if self.backup and dest_sha1 is not None: + self.output.update( + backup_file=self.module.backup_local(self.dest) + ) + + self.module.atomic_move( + assembled.name, self.dest, unsafe_writes=self.unsafe_writes + ) + + self.changed = True + + # Notify file permissions + self.changed = self.module.set_fs_attributes_if_different( + self.file_perms, self.changed + ) + + # Finalize output + self.output.update(msg="OK") + + +def main(): + module = AnsibleModule( + argument_spec=dict( + src=dict(required=True, type="path"), + dest=dict(required=True, type="path"), + backup=dict(type="bool", default=False), + remote_src=dict(type="bool", default=False), + regexp=dict(type="str", aliases=["filter"]), + ignore_hidden=dict(type="bool", default=True), + ), + add_file_common_args=True, + supports_check_mode=True, + ) + + result = AssembleClusterTemplate(module) + + output = dict( + changed=result.changed, + **result.output, + ) + + module.exit_json(**output) + + +if __name__ == "__main__": + main() From c300c6888915342b10830c34d41df33ad7c392fb Mon Sep 17 00:00:00 2001 From: Webster Mudge Date: Thu, 16 Nov 2023 23:27:51 -0500 Subject: [PATCH 2/7] Update assemble_cluster_template to handle JSON and process idempotent and unique keys Signed-off-by: Webster Mudge --- plugins/action/assemble_cluster_template.py | 105 ++++++++++++++++---- 1 file changed, 86 insertions(+), 19 deletions(-) diff --git a/plugins/action/assemble_cluster_template.py b/plugins/action/assemble_cluster_template.py index b2ac5781..28a0c564 100644 --- a/plugins/action/assemble_cluster_template.py +++ b/plugins/action/assemble_cluster_template.py @@ -15,6 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json import os import re import tempfile @@ -35,16 +36,72 @@ class ActionModule(ActionBase): TRANSFERS_FILES = True - def process_fragment(self, fh) -> bytes: - updated = bytearray(fh.read()) - updated.extend("\n-------\n".encode()) - return updated + MERGED = {} + IDEMPOTENT_IDS = ["refName", "name", "clusterName", "hostName", "product"] + UNIQUE_IDS = ["repositories"] + + def update_object(self, base, template, breadcrumbs=""): + if isinstance(base, dict) and isinstance(template, dict): + self.update_dict(base, template, breadcrumbs) + return True + elif isinstance(base, list) and isinstance(template, list): + self.update_list(base, template, breadcrumbs) + return True + return False + + def update_dict(self, base, template, breadcrumbs=""): + for key, value in template.items(): + crumb = breadcrumbs + "/" + key + + if key in self.IDEMPOTENT_IDS: + if base[key] != value: + self._display.error( + "Objects with distinct IDs should not be merged: " + crumb + ) + continue - def complete_assembly(self, assembled_file): - pass + if key not in base: + base[key] = value + elif not self.update_object(base[key], value, crumb) and base[key] != value: + self._display.warning( + f"Value being overwritten for key [{crumb}]], Old: [{base[key]}], New: [{value}]" + ) + base[key] = value + + if key in self.UNIQUE_IDS: + base[key] = list(set(base[key])) + + def update_list(self, base, template, breadcrumbs=""): + for item in template: + if isinstance(item, dict): + for attr in self.IDEMPOTENT_IDS: + if attr in item: + idempotent_id = attr + break + else: + idempotent_id = None + if idempotent_id: + namesake = [ + i for i in base if i[idempotent_id] == item[idempotent_id] + ] + if namesake: + # LOG.warn("List item being replaced, Old: [%s], New: [%s]", namesake[0], item) + self.update_dict( + namesake[0], + item, + breadcrumbs + + "/[" + + idempotent_id + + "=" + + item[idempotent_id] + + "]", + ) + continue + base.append(item) + base.sort(key=lambda x: json.dumps(x, sort_keys=True)) def _assemble_fragments( - self, assembled_file, src_path, regex=None, ignore_hidden=True + self, assembled_file, src_path, regex=None, ignore_hidden=True, decrypt=True ): # By file name sort order for f in ( @@ -62,15 +119,20 @@ def _assemble_fragments( ): continue - with open(self._loader.get_real_file(fragment), "rb") as fragment_file: - content = self.process_fragment(fragment_file) - - # Write the resulting bytes - if content is not None: - assembled_file.write(content) + with open( + self._loader.get_real_file(fragment, decrypt=decrypt), + "r", + encoding="utf-8", + ) as fragment_file: + try: + self.update_object(self.MERGED, json.loads(fragment_file.read())) + except json.JSONDecodeError as e: + raise AnsibleActionFail( + message=f"{to_text(e.msg)}", obj=to_native(e) + ) - # Finalize any remaining assembly - self.complete_assembly(assembled_file) + # Write out the final assembly + json.dump(self.MERGED, assembled_file, indent=2, sort_keys=False) # Close the assembled file handle assembled_file.close() @@ -89,7 +151,8 @@ def run(self, tmp=None, task_vars=None): remote_src = self._task.args.get("remote_src", False) regexp = self._task.args.get("regexp", None) follow = self._task.args.get("follow", False) - ignore_hidden = self._task.args.get("ignore_hidden", True) + ignore_hidden = boolean(self._task.args.get("ignore_hidden", True)) + decrypt = self._task.args.pop("decrypt", True) try: if src is None or dest is None: @@ -117,10 +180,14 @@ def run(self, tmp=None, task_vars=None): compiled = re.compile(regexp) with tempfile.NamedTemporaryFile( - dir=C.DEFAULT_LOCAL_TMP, delete=False + mode="w", encoding="utf-8", dir=C.DEFAULT_LOCAL_TMP, delete=False ) as assembled: self._assemble_fragments( - assembled, src, regex=compiled, ignore_hidden=ignore_hidden + assembled, + src, + regex=compiled, + ignore_hidden=ignore_hidden, + decrypt=decrypt, ) assembled_checksum = checksum_s(assembled.name) @@ -132,7 +199,7 @@ def run(self, tmp=None, task_vars=None): new_module_args = self._task.args.copy() # Purge cloudera.cluster.assemble_cluster_template-specific options - for o in ["remote_src", "regexp", "ignore_hidden"]: + for o in ["remote_src", "regexp", "filter", "ignore_hidden", "decrypt", ]: new_module_args.pop(o, None) new_module_args.update(dest=dest) From c23344b4ff7c75088ea3ce369009a074c09f5dc7 Mon Sep 17 00:00:00 2001 From: Webster Mudge Date: Thu, 16 Nov 2023 23:28:08 -0500 Subject: [PATCH 3/7] Update assemble_cluster_template documentation Signed-off-by: Webster Mudge --- plugins/modules/assemble_cluster_template.py | 111 +++++++++++++++---- 1 file changed, 88 insertions(+), 23 deletions(-) diff --git a/plugins/modules/assemble_cluster_template.py b/plugins/modules/assemble_cluster_template.py index 3df34e2a..2a5f507d 100644 --- a/plugins/modules/assemble_cluster_template.py +++ b/plugins/modules/assemble_cluster_template.py @@ -26,42 +26,107 @@ module: assemble_cluster_template short_description: Merge Cloudera Manager cluster template fragments description: - - Merge Cloudera Manager cluster template fragment files into a single JSON file. - - The module supports C(check_mode). + - Merge multiple Cloudera Manager cluster template files into a single cluster template file. + - Often a cluster template file is composed of several services, host templates, + and other parameters from multiple sources and/or configurations. + M(cloudera.cluster.assemble_cluster_template) will take a directory of + cluster template configuration files that can be local or have already been + transferred to the system and merge them together to produce a single, + composite cluster template configuration file. + - Files are merged in string sorting order. +version_added: "4.2.0" author: - "Webster Mudge (@wmudge)" - "Ronald Suplina (@rsuplina)" - "Jim Enright (@jenright)" - "Andre Araujo (@asdaraujo)" options: - method: + src: description: - - HTTP method for the CM API endpoint path. - type: str + - An already existing directory of cluster template files. + type: path + required: True + aliases: + - cluster_template_src + dest: + description: + - A file to create using the merger of all of the cluster template files. + type: path required: True - choices: - - DELETE - - POST - - PUT - body: + aliases: + - cluster_template + backup: + description: + - Create a backup file if V(true). + - The backup file name includes a timestamp. + type: bool + default: False + remote_src: + description: + - Flag to control the location of the cluster template configuration source files. + - If V(false), search for I(src) on the controller. + - If V(true), search for I(src) on the remote/target. + type: bool + default: False + regexp: description: - - HTTP body for the CM API endpoint call. - type: dict + - Merge files only if the given regular expression matches the filename. + - If not set, all files within C(src) are merged. + - Every V(\\) (backslash) must be escaped as V(\\\\) to conform to YAML syntax. + - See L(Python regular expressions,https://docs.python.org/3/library/re.html). + type: str + aliases: + - filter + ignore_hidden: + description: + - Flag whether to include files that begin with a '.'. + type: bool + default: True +attributes: + action: + support: full + async: + support: none + bypass_host_loop: + support: none + check_mode: + support: none + diff_mode: + support: full + platform: + platforms: posix + safe_file_operations: + support: full + vault: + support: full +seealso: + - module: ansible.builtin.assemble + - module: ansible.builtin.copy + - module: ansible.builtin.template + - module: ansible.windows.win_copy extends_documentation_fragment: - - action_common_attributes - - action_common_attributes.flow - - action_common_attributes.files - - decrypt - - files + - action_common_attributes + - action_common_attributes.flow + - action_common_attributes.files + - decrypt + - files """ EXAMPLES = r""" --- +- name: Assemble a cluster template from files (on the controller) + cloudera.cluster.assemble_cluster_template: + src: examples + dest: /opt/cloudera/cluster-template.json + +- name: Assemble a cluster template from selected files (on the controller) + cloudera.cluster.assemble_cluster_template: + src: examples + dest: /opt/cloudera/cluster-template.json + regexp: "base|nifi" """ -RETURN = r""" ---- -""" +RETURN = r"""#""" import os import re @@ -199,15 +264,15 @@ def process(self): def main(): module = AnsibleModule( argument_spec=dict( - src=dict(required=True, type="path"), - dest=dict(required=True, type="path"), + src=dict(required=True, type="path", aliases=["cluster_template_src"]), + dest=dict(required=True, type="path", aliases=["cluster_template"]), backup=dict(type="bool", default=False), remote_src=dict(type="bool", default=False), regexp=dict(type="str", aliases=["filter"]), ignore_hidden=dict(type="bool", default=True), ), add_file_common_args=True, - supports_check_mode=True, + supports_check_mode=False, ) result = AssembleClusterTemplate(module) From c3e5093c34e2a2253e978e0706645f2afc16f642 Mon Sep 17 00:00:00 2001 From: Webster Mudge Date: Fri, 17 Nov 2023 12:19:33 -0500 Subject: [PATCH 4/7] Update to handle aliases, temp file cleanup, documentation for assemble_cluster_template action plugin Signed-off-by: Webster Mudge --- plugins/action/assemble_cluster_template.py | 137 ++++++++++++-------- 1 file changed, 86 insertions(+), 51 deletions(-) diff --git a/plugins/action/assemble_cluster_template.py b/plugins/action/assemble_cluster_template.py index 28a0c564..fd63561c 100644 --- a/plugins/action/assemble_cluster_template.py +++ b/plugins/action/assemble_cluster_template.py @@ -85,7 +85,6 @@ def update_list(self, base, template, breadcrumbs=""): i for i in base if i[idempotent_id] == item[idempotent_id] ] if namesake: - # LOG.warn("List item being replaced, Old: [%s], New: [%s]", namesake[0], item) self.update_dict( namesake[0], item, @@ -100,7 +99,7 @@ def update_list(self, base, template, breadcrumbs=""): base.append(item) base.sort(key=lambda x: json.dumps(x, sort_keys=True)) - def _assemble_fragments( + def assemble_fragments( self, assembled_file, src_path, regex=None, ignore_hidden=True, decrypt=True ): # By file name sort order @@ -128,14 +127,15 @@ def _assemble_fragments( self.update_object(self.MERGED, json.loads(fragment_file.read())) except json.JSONDecodeError as e: raise AnsibleActionFail( - message=f"{to_text(e.msg)}", obj=to_native(e) + message=f"JSON parsing error: {to_text(e.msg)}", + obj=to_native(e), ) # Write out the final assembly json.dump(self.MERGED, assembled_file, indent=2, sort_keys=False) - # Close the assembled file handle - assembled_file.close() + # Flush the assembled file handle + assembled_file.flush() def run(self, tmp=None, task_vars=None): self._supports_check_mode = False @@ -146,18 +146,29 @@ def run(self, tmp=None, task_vars=None): if task_vars is None: task_vars = dict() + # Handle aliases src = self._task.args.get("src", None) + if src is None: + src = self._task.args.get("cluster_template_src", None) + dest = self._task.args.get("dest", None) - remote_src = self._task.args.get("remote_src", False) + if dest is None: + dest = self._task.args.get("cluster_template") + regexp = self._task.args.get("regexp", None) - follow = self._task.args.get("follow", False) + if regexp is None: + regexp = self._task.args.get("filter", None) + + remote_src = boolean(self._task.args.get("remote_src", False)) + follow = boolean(self._task.args.get("follow", False)) ignore_hidden = boolean(self._task.args.get("ignore_hidden", True)) decrypt = self._task.args.pop("decrypt", True) try: if src is None or dest is None: - raise AnsibleActionFail("src and dest are required") + raise AnsibleActionFail("Both 'src' and 'dest' are required") + # If src files are on the remote host, run the module if boolean(remote_src, strict=False): result.update( self._execute_module( @@ -175,14 +186,16 @@ def run(self, tmp=None, task_vars=None): if not os.path.isdir(src): raise AnsibleActionFail(f"Source, {src}, is not a directory") + # Compile the regexp compiled = None if regexp is not None: compiled = re.compile(regexp) + # Assemble the src files into output file with tempfile.NamedTemporaryFile( - mode="w", encoding="utf-8", dir=C.DEFAULT_LOCAL_TMP, delete=False + mode="w", encoding="utf-8", dir=C.DEFAULT_LOCAL_TMP ) as assembled: - self._assemble_fragments( + self.assemble_fragments( assembled, src, regex=compiled, @@ -190,54 +203,76 @@ def run(self, tmp=None, task_vars=None): decrypt=decrypt, ) - assembled_checksum = checksum_s(assembled.name) - dest = self._remote_expand_user(dest) - dest_stat = self._execute_remote_stat( - dest, all_vars=task_vars, follow=follow - ) - - new_module_args = self._task.args.copy() - - # Purge cloudera.cluster.assemble_cluster_template-specific options - for o in ["remote_src", "regexp", "filter", "ignore_hidden", "decrypt", ]: - new_module_args.pop(o, None) + # Gather the checksums for assembled file and destination file + assembled_checksum = checksum_s(assembled.name) - new_module_args.update(dest=dest) - - diff = {} - if assembled_checksum != dest_stat["checksum"]: - if self._task.diff: - diff = self._get_diff_data(dest, assembled.name, task_vars) - - remote_path = self._connection._shell.join_path( - self._connection._shell.tmpdir, "assembled_cluster_template" + dest = self._remote_expand_user(dest) + dest_stat = self._execute_remote_stat( + dest, all_vars=task_vars, follow=follow ) - transfered = self._transfer_file(assembled.name, remote_path) - self._fixup_perms2((self._connection._shell.tmpdir, remote_path)) + # Prepare the task arguments for the called submodules + submodule_args = self._task.args.copy() + + # Purge non-submodule arguments + for o in [ + "cluster_template_src", + "cluster_template", + "remote_src", + "regexp", + "filter", + "ignore_hidden", + "decrypt", + ]: + submodule_args.pop(o, None) + + # Update the 'dest' arg + submodule_args.update(dest=dest) + + if assembled_checksum != dest_stat["checksum"]: + diff = {} + + if self._task.diff: + diff = self._get_diff_data(dest, assembled.name, task_vars) + + # Define a temporary remote path for the remote copy + remote_path = self._connection._shell.join_path( + self._connection._shell.tmpdir, "assembled_cluster_template" + ) + + # Transfer the file to the remote path + transfered = self._transfer_file(assembled.name, remote_path) - new_module_args.update( - dict( - src=transfered, + # Update the file permissions on the remote file + self._fixup_perms2((self._connection._shell.tmpdir, remote_path)) + + # Update the 'src' arg with the temporary remote file + submodule_args.update( + dict( + src=transfered, + ) ) - ) - copy = self._execute_module( - module_name="ansible.legacy.copy", - module_args=new_module_args, - task_vars=task_vars, - ) + # Execute the copy + copy = self._execute_module( + module_name="ansible.legacy.copy", + module_args=submodule_args, + task_vars=task_vars, + ) - if diff: - copy.update(diff=diff) - result.update(copy) - else: - file = self._execute_module( - module_name="ansible.legacy.file", - module_args=new_module_args, - task_vars=task_vars, - ) - result.update(file) + if diff: + copy.update(diff=diff) + + result.update(copy) + else: + # Gather details on the existing file + file = self._execute_module( + module_name="ansible.legacy.file", + module_args=submodule_args, + task_vars=task_vars, + ) + + result.update(file) except AnsibleAction as e: result.update(e.result) finally: From acd69dd85e4028072d03db2c44e15dae3c77fdf5 Mon Sep 17 00:00:00 2001 From: Webster Mudge Date: Fri, 17 Nov 2023 13:05:21 -0500 Subject: [PATCH 5/7] Add error handling and formatting for assemble_cluster_template Signed-off-by: Webster Mudge --- plugins/action/assemble_cluster_template.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/plugins/action/assemble_cluster_template.py b/plugins/action/assemble_cluster_template.py index fd63561c..655682be 100644 --- a/plugins/action/assemble_cluster_template.py +++ b/plugins/action/assemble_cluster_template.py @@ -1,6 +1,3 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - # Copyright 2023 Cloudera, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -189,7 +186,12 @@ def run(self, tmp=None, task_vars=None): # Compile the regexp compiled = None if regexp is not None: - compiled = re.compile(regexp) + try: + compiled = re.compile(regexp) + except re.error as e: + raise AnsibleActionFail( + message=f"Regular expression, {regexp}, is invalid: {to_native(e)}" + ) # Assemble the src files into output file with tempfile.NamedTemporaryFile( @@ -231,7 +233,7 @@ def run(self, tmp=None, task_vars=None): if assembled_checksum != dest_stat["checksum"]: diff = {} - + if self._task.diff: diff = self._get_diff_data(dest, assembled.name, task_vars) @@ -239,7 +241,7 @@ def run(self, tmp=None, task_vars=None): remote_path = self._connection._shell.join_path( self._connection._shell.tmpdir, "assembled_cluster_template" ) - + # Transfer the file to the remote path transfered = self._transfer_file(assembled.name, remote_path) @@ -262,7 +264,7 @@ def run(self, tmp=None, task_vars=None): if diff: copy.update(diff=diff) - + result.update(copy) else: # Gather details on the existing file @@ -271,7 +273,7 @@ def run(self, tmp=None, task_vars=None): module_args=submodule_args, task_vars=task_vars, ) - + result.update(file) except AnsibleAction as e: result.update(e.result) From 4418710087264362886add9f8f862b5ff3a69608 Mon Sep 17 00:00:00 2001 From: Webster Mudge Date: Fri, 17 Nov 2023 13:05:54 -0500 Subject: [PATCH 6/7] Add merge logic to assemble_cluster_template module plugin Signed-off-by: Webster Mudge --- plugins/modules/assemble_cluster_template.py | 126 +++++++++++++------ 1 file changed, 90 insertions(+), 36 deletions(-) diff --git a/plugins/modules/assemble_cluster_template.py b/plugins/modules/assemble_cluster_template.py index 2a5f507d..7710c6d5 100644 --- a/plugins/modules/assemble_cluster_template.py +++ b/plugins/modules/assemble_cluster_template.py @@ -1,6 +1,3 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - # Copyright 2023 Cloudera, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -44,6 +41,7 @@ src: description: - An already existing directory of cluster template files. + - TODO Local or remote type: path required: True aliases: @@ -118,16 +116,23 @@ cloudera.cluster.assemble_cluster_template: src: examples dest: /opt/cloudera/cluster-template.json - + - name: Assemble a cluster template from selected files (on the controller) cloudera.cluster.assemble_cluster_template: src: examples dest: /opt/cloudera/cluster-template.json regexp: "base|nifi" + +- name: Assemble a cluster template from files on the target host + cloudera.cluster.assemble_cluster_template: + src: /tmp/examples + dest: /opt/cloudera/cluster-template.json + remote_src: yes """ RETURN = r"""#""" +import json import os import re import tempfile @@ -137,9 +142,13 @@ class AssembleClusterTemplate(object): + MERGED = {} + IDEMPOTENT_IDS = ["refName", "name", "clusterName", "hostName", "product"] + UNIQUE_IDS = ["repositories"] + def __init__(self, module): - self.module = module - + self.module = module + # Set parameters self.src = self.module.params["src"] self.dest = self.module.params["dest"] @@ -161,15 +170,66 @@ def __init__(self, module): # Execute the logic self.process() - def process_fragment(self, fh) -> bytes: - updated = bytearray(fh.read()) - updated.extend("\n-------\n".encode()) - return updated - - def complete_assembly(self, assembled_file): - pass + def update_object(self, base, template, breadcrumbs=""): + if isinstance(base, dict) and isinstance(template, dict): + self.update_dict(base, template, breadcrumbs) + return True + elif isinstance(base, list) and isinstance(template, list): + self.update_list(base, template, breadcrumbs) + return True + return False + + def update_dict(self, base, template, breadcrumbs=""): + for key, value in template.items(): + crumb = breadcrumbs + "/" + key + + if key in self.IDEMPOTENT_IDS: + if base[key] != value: + self.module.warn( + f"Objects with distinct IDs should not be merged: {crumb}" + ) + continue - def _assemble_fragments(self, assembled_file): + if key not in base: + base[key] = value + elif not self.update_object(base[key], value, crumb) and base[key] != value: + self.module.warn( + f"Value being overwritten for key [{crumb}]; Old: [{base[key]}], New: [{value}]" + ) + base[key] = value + + if key in self.UNIQUE_IDS: + base[key] = list(set(base[key])) + + def update_list(self, base, template, breadcrumbs=""): + for item in template: + if isinstance(item, dict): + for attr in self.IDEMPOTENT_IDS: + if attr in item: + idempotent_id = attr + break + else: + idempotent_id = None + if idempotent_id: + namesake = [ + i for i in base if i[idempotent_id] == item[idempotent_id] + ] + if namesake: + self.update_dict( + namesake[0], + item, + breadcrumbs + + "/[" + + idempotent_id + + "=" + + item[idempotent_id] + + "]", + ) + continue + base.append(item) + base.sort(key=lambda x: json.dumps(x, sort_keys=True)) + + def assemble_fragments(self, assembled_file): # By file name sort order for f in sorted(os.listdir(self.src)): # Filter by regexp @@ -183,17 +243,18 @@ def _assemble_fragments(self, assembled_file): ): continue - with open(fragment, "rb") as fragment_file: - content = self.process_fragment(fragment_file) - - # Write the resulting bytes - if content is not None: - assembled_file.write(content) + with open(fragment, "r", encoding="utf-8") as fragment_file: + try: + self.update_object(self.MERGED, json.loads(fragment_file.read())) + except json.JSONDecodeError as e: + self.module.fail_json( + msg=f"JSON parsing error: {to_text(e.msg)}", error=to_native(e) + ) - # Finalize any remaining assembly - self.complete_assembly(assembled_file) + # Write out the final assembly + json.dump(self.MERGED, assembled_file, indent=2, sort_keys=False) - # Close the assembled file handle + # Close the assembled file handle; will not delete for atomic_move assembled_file.close() def process(self): @@ -212,18 +273,13 @@ def process(self): msg=f"Regular expression, {self.regexp} is invalid: {to_native(e)}" ) - # Assemble fragments + # Assemble the src files into output file + # No deletion on close; atomic_move "removes" the file with tempfile.NamedTemporaryFile( - dir=self.module.tmpdir, delete=False - ) as assembled: + mode="w", encoding="utf-8", dir=self.module.tmpdir, delete=False + ) as assembled: # Process fragments into temporary file - self._assemble_fragments(assembled) - - # Confirm the assembled file is closed - if not assembled.closed: - self.module.fail_json( - msg=f"Assembled file, {assembled.name}, not closed after fragment processing" - ) + self.assemble_fragments(assembled) # Generate hashes for assembled file assembled_sha1 = self.module.sha1(assembled.name) @@ -242,9 +298,7 @@ def process(self): if assembled_sha1 != dest_sha1: if self.backup and dest_sha1 is not None: - self.output.update( - backup_file=self.module.backup_local(self.dest) - ) + self.output.update(backup_file=self.module.backup_local(self.dest)) self.module.atomic_move( assembled.name, self.dest, unsafe_writes=self.unsafe_writes From 8f359ffa1e2671d891f1535173828ef1d957a2e2 Mon Sep 17 00:00:00 2001 From: Webster Mudge Date: Mon, 20 Nov 2023 09:21:53 -0500 Subject: [PATCH 7/7] Add missing to_text import Signed-off-by: Webster Mudge --- plugins/modules/assemble_cluster_template.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/modules/assemble_cluster_template.py b/plugins/modules/assemble_cluster_template.py index 7710c6d5..39b64c8b 100644 --- a/plugins/modules/assemble_cluster_template.py +++ b/plugins/modules/assemble_cluster_template.py @@ -138,7 +138,7 @@ import tempfile from ansible.module_utils.basic import AnsibleModule -from ansible.module_utils.common.text.converters import to_native +from ansible.module_utils.common.text.converters import to_native, to_text class AssembleClusterTemplate(object):