From dbc56774f61cc143e3d79a387e73a6f6aebcaa37 Mon Sep 17 00:00:00 2001 From: Daniel Chaffelson Date: Fri, 29 Oct 2021 18:32:15 +0100 Subject: [PATCH 1/4] WIP Signed-off-by: Daniel Chaffelson --- src/cdpy/common.py | 68 ++++++++++++++++++++------- src/cdpy/df.py | 112 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 162 insertions(+), 18 deletions(-) diff --git a/src/cdpy/common.py b/src/cdpy/common.py index e0cc158..a29ac1f 100644 --- a/src/cdpy/common.py +++ b/src/cdpy/common.py @@ -12,6 +12,7 @@ import warnings import traceback import urllib3 +from urllib.parse import urljoin from urllib3.exceptions import InsecureRequestWarning from json import JSONDecodeError from typing import Union @@ -128,7 +129,7 @@ def __init__(self, value, field='error_code', default=None, warning=None): class StaticCredentials(Credentials): """A credential class that simply takes a set of static credentials.""" - def __init__(self, access_key_id, private_key, access_token='', method='static'): + def __init__(self, access_key_id='', private_key='', access_token='', method='static'): super(StaticCredentials, self).__init__( access_key_id=access_key_id, private_key=private_key, access_token=access_token, method=method @@ -158,6 +159,7 @@ def __init__(self, debug=False, tls_verify=False, strict_errors=False, tls_warni _loader = Loader() _user_agent = self._make_user_agent_header() + self._client_creator = ClientCreator( _loader, Context(), @@ -274,6 +276,9 @@ def _warning_format(message, category, filename, lineno, line=None): self.CREDENTIAL_NAME_PATTERN = re.compile(r'[^a-z0-9-]') self.OPERATION_REGEX = re.compile(r'operation ([0-9a-zA-Z-]{36}) running') + # Workload services with special credential and endpoint handling + self.WORKLOAD_SERVICES = ['dfworkload'] + def _make_user_agent_header(self): cdpy_version = pkg_resources.get_distribution('cdpy').version return '%s CDPY/%s CDPCLI/%s Python/%s %s/%s' % ( @@ -305,24 +310,44 @@ def _setup_logger(self, log_level, log_format): self.logger.addHandler(handler) - def _build_client(self, service): - if not self.cdp_credentials: - self.cdp_credentials = self._client_creator.context.get_credentials() + def _build_client(self, service, parameters=None): + if service in self.WORKLOAD_SERVICES: + if service == 'dfworkload': + workload_name = 'DF' + else: + workload_name = None + self.throw_error(CdpError("Workload %s not recognised for client generation" % service)) + if 'environmentCrn' not in parameters: + self.throw_error(CdpError("environmentCrn must be supplied when connecting to %s" % service)) + df_access_token = self.call( + svc='iam', func='generate_workload_auth_token', + workloadName=workload_name, environmentCrn=parameters['environmentCrn'] + ) + token = df_access_token['token'] + if not token.startswith('Bearer '): + token = 'Bearer ' + token + credentials = StaticCredentials(access_token=token) + endpoint_url = urljoin(df_access_token['endpointUrl'], '/') + else: + if not self.cdp_credentials: + self.cdp_credentials = self._client_creator.context.get_credentials() + credentials = self.cdp_credentials + endpoint_url = self.client_endpoint try: # region introduced in client version 0.9.42 client = self._client_creator.create_client( service_name=service, region=self.cp_region, - explicit_endpoint_url=self.client_endpoint, + explicit_endpoint_url=endpoint_url, tls_verification=self.tls_verify, - credentials=self.cdp_credentials + credentials=credentials ) except TypeError: client = self._client_creator.create_client( service_name=service, - explicit_endpoint_url=self.client_endpoint, + explicit_endpoint_url=endpoint_url, tls_verification=self.tls_verify, - credentials=self.cdp_credentials + credentials=credentials ) return client @@ -358,11 +383,22 @@ def _default_throw_warning(warning: 'CdpWarning'): def regex_search(pattern, obj): return re.search(pattern, obj) - def validate_crn(self, obj: str): - if obj is not None and obj.startswith('crn:'): - pass - else: - self.throw_error(CdpError("Supplied env_crn %s is not a valid CDP crn" % str(obj))) + def validate_crn(self, obj: str, crn_type=None): + # TODO: Rework with check_strings as pass through from module + check_strings = ['crn:'] + crn_type = crn_type if crn_type is not None else 'generic' + if crn_type == 'env': + check_strings += [':environments:', ':environment:'] + if crn_type == 'df': + check_strings += [':df:', ':service:'] + if crn_type == 'flow': + check_strings += [':df:', ':flow:'] + if crn_type == 'deployment': + check_strings += [':df:', ':deployment:'] + for substring in check_strings: + if substring not in obj: + self.throw_error(CdpError("Supplied crn %s of proposed type %s is missing substring %s" + % (str(obj), crn_type, substring))) @staticmethod def sleep(seconds): @@ -388,10 +424,10 @@ def _convert(o): return json.dumps(data, indent=2, default=_convert) - def _client(self, service): + def _client(self, service, parameters=None): """Builds a CDP Endpoint client of a given type, and caches it against later reuse""" if service not in self._clients: - self._clients[service] = self._build_client(service) + self._clients[service] = self._build_client(service, parameters) return self._clients[service] def read_file(self, file_path): @@ -520,7 +556,7 @@ def call(self, svc: str, func: str, ret_field: str = None, squelch: ['Squelch'] Returns (dict, list, None): Output of CDP CLI Call """ try: - call_function = getattr(self._client(service=svc), func) + call_function = getattr(self._client(service=svc, parameters=kwargs), func) if self.scrub_inputs: # Remove unused submission values as the API rejects them payload = {x: y for x, y in kwargs.items() if y is not None} diff --git a/src/cdpy/df.py b/src/cdpy/df.py index 40156a5..80fe174 100644 --- a/src/cdpy/df.py +++ b/src/cdpy/df.py @@ -1,17 +1,19 @@ # -*- coding: utf-8 -*- from cdpy.common import CdpSdkBase, Squelch, CdpError +from cdpcli.extensions.df.createdeployment import CreateDeploymentOperationCaller class CdpyDf(CdpSdkBase): def __init__(self, *args, **kwargs): + self.DEPLOYMENT_SIZES = ['EXTRA_SMALL', 'SMALL', 'MEDIUM', 'LARGE'] super().__init__(*args, **kwargs) def list_services(self, only_enabled=False, env_crn=None, df_crn=None, name=None): result = self.sdk.call( svc='df', func='list_services', ret_field='services', squelch=[ Squelch(value='NOT_FOUND', default=list(), - warning='No DataFlow Deployments found') + warning='No DataFlow Services found') ], pageSize=self.sdk.DEFAULT_PAGE_SIZE ) @@ -35,15 +37,17 @@ def describe_service(self, df_crn: str = None, env_crn: str = None): elif len(services) == 1: resolved_df_crn = services[0]['crn'] else: + resolved_df_crn = None self.sdk.throw_error( CdpError('More than one DataFlow service found for env_crn, please try list instead') ) else: + resolved_df_crn = None self.sdk.throw_error(CdpError("Either df_crn or env_crn must be supplied to df.describe_service")) return self.sdk.call( svc='df', func='describe_service', ret_field='service', squelch=[ Squelch(value='NOT_FOUND', - warning='No DataFlow Deployment with crn %s found' % df_crn), + warning='No DataFlow Service with crn %s found' % df_crn), Squelch(value='PERMISSION_DENIED') # DF GRPC sometimes returns 403 when finishing deletion ], serviceCrn=resolved_df_crn @@ -73,3 +77,107 @@ def reset_service(self, df_crn: str): svc='df', func='reset_service', serviceCrn=df_crn ) + + def list_deployments(self, env_crn=None, df_crn=None, name=None): + result = self.sdk.call( + svc='df', func='list_deployments', ret_field='deployments', squelch=[ + Squelch(value='NOT_FOUND', default=list(), + warning='No DataFlow Deployments found') + ], + pageSize=self.sdk.DEFAULT_PAGE_SIZE + ) + if name is not None: + result = [x for x in result if x['name'] == name] + if df_crn is not None: + result = [x for x in result if x['service']['crn'] == df_crn] + if env_crn is not None: + result = [x for x in result if x['service']['environmentCrn'] == env_crn] + return result + + def describe_deployment(self, dep_crn): + self.sdk.validate_crn(dep_crn) + return self.sdk.call( + svc='df', func='describe_deployment', ret_field='deployment', squelch=[ + Squelch(value='NOT_FOUND', + warning='No DataFlow Deployment with crn %s found' % dep_crn) + ], + deploymentCrn=dep_crn + ) + + def list_readyflows(self, name=None): + result = self.sdk.call( + svc='df', func='list_readyflows', ret_field='readyflows', squelch=[ + Squelch(value='NOT_FOUND', + warning='No ReadyFlows found within your CDP Tenant') + ], + ) + if name is not None: + result = [x for x in result if x['name'] == name] + return result + + def resolve_environment_from_dataflow(self, df_crn): + self.sdk.validate_crn(df_crn, 'df') + df_info = self.describe_service(df_crn=df_crn) + if df_info: + return df_info['environmentCrn'] + else: + self.sdk.throw_error(CdpError("Could not resolve an Environment CRN from DataFlow CRN %s" % df_crn)) + + def create_deployment(self, df_crn, flow_ver_crn, deployment_name, size_name=None, static_node_count=None, + autoscale_enabled=None, autoscale_nodes_min=None, autoscale_nodes_max=None, nifi_ver=None, + autostart_flow=None, parameters=None, kpis=None): + # Validations + if size_name is not None and size_name not in self.DEPLOYMENT_SIZES: + self.sdk.throw_error(CdpError("Deployment size_name %s not in supported size list: %s" + % (size_name, str(self.DEPLOYMENT_SIZES)))) + _ = [self.sdk.validate_crn(x[0], x[1]) for x in [(df_crn, 'df'), (flow_ver_crn, 'flow')]] + if self.list_deployments(name=deployment_name): + self.sdk.throw_error(CdpError("Deployment already exists with conflicting name %s" % deployment_name)) + # Setup + config = dict( + autoStartFlow=autostart_flow if autostart_flow is not None else True, + parameterGroups=parameters, + deploymentName=deployment_name, + environmentCrn=self.resolve_environment_from_dataflow(df_crn), + clusterSizeName=size_name if size_name is not None else 'EXTRA_SMALL', + cfmNifiVersion=nifi_ver, + kpis=kpis + ) + if autoscale_enabled: + config['autoScalingEnabled'] = True + config['autoScaleMinNodes'] = autoscale_nodes_min if autoscale_nodes_min is not None else 1 + config['autoScaleMaxNodes'] = autoscale_nodes_max if autoscale_nodes_max is not None else 3 + else: + config['staticNodeCount'] = static_node_count if static_node_count is not None else 1 + + # cdpcli/extensions/df/createdeployment.py cdpcli-beta v0.9.48+ + dep_req_crn = self.sdk.call( + svc='df', func='initiate_deployment', ret_field='deploymentRequestCrn', + serviceCrn=df_crn, flowVersionCrn=flow_ver_crn + ) + df_handler = CreateDeploymentOperationCaller() + df_handler._upload_assets( + df_workload_client=self.sdk._client( + service='dfworkload', + parameters=config + ), + deployment_request_crn=dep_req_crn, + parameters=config + ) + resp = df_handler._create_deployment( + df_workload_client=self.sdk._client( + service='dfworkload', + parameters=config + ), + deployment_request_crn=dep_req_crn, + environment_crn=config['environmentCrn'], + parameters=config + ) + return resp + + def terminate_deployment(self, env_crn, dep_crn): + _ = [self.sdk.validate_crn(x[0], x[1]) for x in [(env_crn, 'env'), (dep_crn, 'deployment')]] + return self.sdk.call( + svc='dfworkload', func='terminate_deployment', ret_field='deployment', + environmentCrn=env_crn, deploymentCrn=dep_crn + ) From 119edada79fc5589705268b304f808a5cdab8074 Mon Sep 17 00:00:00 2001 From: Daniel Chaffelson Date: Thu, 16 Dec 2021 20:27:22 +0000 Subject: [PATCH 2/4] WIP Signed-off-by: Daniel Chaffelson --- src/cdpy/common.py | 8 ++- src/cdpy/df.py | 144 ++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 133 insertions(+), 19 deletions(-) diff --git a/src/cdpy/common.py b/src/cdpy/common.py index a29ac1f..731c5b5 100644 --- a/src/cdpy/common.py +++ b/src/cdpy/common.py @@ -393,12 +393,14 @@ def validate_crn(self, obj: str, crn_type=None): check_strings += [':df:', ':service:'] if crn_type == 'flow': check_strings += [':df:', ':flow:'] + if crn_type == 'readyflow': + check_strings += [':df:', 'readyFlow'] if crn_type == 'deployment': check_strings += [':df:', ':deployment:'] for substring in check_strings: - if substring not in obj: - self.throw_error(CdpError("Supplied crn %s of proposed type %s is missing substring %s" - % (str(obj), crn_type, substring))) + if substring not in obj: + self.throw_error(CdpError("Supplied crn %s of proposed type %s is missing substring %s" + % (str(obj), crn_type, substring))) @staticmethod def sleep(seconds): diff --git a/src/cdpy/df.py b/src/cdpy/df.py index 80fe174..0ac86c2 100644 --- a/src/cdpy/df.py +++ b/src/cdpy/df.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -from cdpy.common import CdpSdkBase, Squelch, CdpError +from cdpy.common import CdpSdkBase, Squelch, CdpError, CdpWarning from cdpcli.extensions.df.createdeployment import CreateDeploymentOperationCaller @@ -18,7 +18,7 @@ def list_services(self, only_enabled=False, env_crn=None, df_crn=None, name=None pageSize=self.sdk.DEFAULT_PAGE_SIZE ) if only_enabled: - result = [x for x in result if x['status']['state'] not in ['NOT_ENABLED']] + result = [x for x in result if x['status']['state'] in self.sdk.STARTED_STATES] if name is not None: result = [x for x in result if x['name'] == name] if df_crn is not None: @@ -53,15 +53,27 @@ def describe_service(self, df_crn: str = None, env_crn: str = None): serviceCrn=resolved_df_crn ) + def resolve_service_crn_from_name(self, name): + listing = self.list_services(only_enabled=True, name=name) + # More than one DF Service may exist with a given name if it was previously uncleanly deleted + if len(listing) == 1: + return listing[0]['crn'] + elif len(listing) == 0: + self.sdk.throw_warning(CdpWarning("No DataFlow Service found matching name %s" % name)) + return None + else: + self.sdk.throw_error(CdpError("Multiple DataFlow Services found matching name %s" % name)) + def enable_service(self, env_crn: str, lb_ips: list = None, min_nodes: int = 3, max_nodes: int = 3, enable_public_ip: bool = True, kube_ips: list = None, cluster_subnets: list = None, - lb_subnets: list = None): + lb_subnets: list = None, tags: list = None): self.sdk.validate_crn(env_crn) return self.sdk.call( svc='df', func='enable_service', ret_field='service', environmentCrn=env_crn, minK8sNodeCount=min_nodes, maxK8sNodeCount=max_nodes, usePublicLoadBalancer=enable_public_ip, kubeApiAuthorizedIpRanges=kube_ips, - loadBalancerAuthorizedIpRanges=lb_ips, clusterSubnets=cluster_subnets, loadBalancerSubnets=lb_subnets + loadBalancerAuthorizedIpRanges=lb_ips, clusterSubnets=cluster_subnets, + loadBalancerSubnets=lb_subnets, tags=tags ) def disable_service(self, df_crn: str, persist: bool = False, terminate=False): @@ -94,8 +106,25 @@ def list_deployments(self, env_crn=None, df_crn=None, name=None): result = [x for x in result if x['service']['environmentCrn'] == env_crn] return result - def describe_deployment(self, dep_crn): - self.sdk.validate_crn(dep_crn) + def describe_deployment(self, dep_crn=None, df_crn=None, name=None): + if dep_crn is not None: + self.sdk.validate_crn(dep_crn) + elif df_crn is not None and name is not None: + deployments = self.list_deployments(df_crn=df_crn, name=name) + if len(deployments) == 0: + return None + elif len(deployments) == 1: + dep_crn = deployments[0]['crn'] + else: + self.sdk.throw_error( + CdpError('More than one DataFlow Deployment found, please try list instead') + ) + else: + self.sdk.throw_error( + CdpError( + "Either dep_crn or both of df_crn and name must be supplied" + ) + ) return self.sdk.call( svc='df', func='describe_deployment', ret_field='deployment', squelch=[ Squelch(value='NOT_FOUND', @@ -105,6 +134,7 @@ def describe_deployment(self, dep_crn): ) def list_readyflows(self, name=None): + # Lists readyflows that can be added to the Catalog for Deployment result = self.sdk.call( svc='df', func='list_readyflows', ret_field='readyflows', squelch=[ Squelch(value='NOT_FOUND', @@ -115,17 +145,97 @@ def list_readyflows(self, name=None): result = [x for x in result if x['name'] == name] return result - def resolve_environment_from_dataflow(self, df_crn): - self.sdk.validate_crn(df_crn, 'df') - df_info = self.describe_service(df_crn=df_crn) - if df_info: - return df_info['environmentCrn'] + def list_flow_definitions(self, name=None): + # Lists definitions in the Catalog. May contain more than one artefactType: flows, readyFlows + result = self.sdk.call( + svc='df', func='list_flow_definitions', ret_field='flows', squelch=[ + Squelch(value='NOT_FOUND', + warning='No Flow Definitions found within your CDP Tenant Catalog') + ], + ) + if name is not None: + result = [x for x in result if x['name'] == name] + return result + + def describe_added_readyflow(self, def_crn, sort_versions=True): + # Describes readyFlows added to the Catalog + self.sdk.validate_crn(def_crn, 'readyflow') + result = self.sdk.call( + svc='df', func='describe_added_readyflow', ret_field='readyflowDetail', squelch=[ + Squelch(value='NOT_FOUND', + warning='No ReadyFlow Definition with crn %s found' % def_crn) + ], + readyflowCrn=def_crn + ) + out = result + if sort_versions: + out['versions'] = sorted(result['versions'], key=lambda d: d['version'], reverse=True) + return out + + def describe_customflow(self, def_crn, sort_versions=True): + self.sdk.validate_crn(def_crn, 'flow') + result = self.sdk.call( + svc='df', func='describe_flow', ret_field='flowDetail', squelch=[ + Squelch(value='NOT_FOUND', + warning='No Flow Definition with crn %s found' % def_crn) + ], + flowCrn=def_crn + ) + out = result + if sort_versions: + out['versions'] = sorted(result['versions'], key=lambda d: d['version'], reverse=True) + return out + + def get_version_crn_from_flow_definition(self, flow_name, version=None): + summary_list = self.list_flow_definitions(name=flow_name) + if summary_list: + if len(summary_list) == 1: + flow_def = summary_list[0] + kind = flow_def['artifactType'] + if kind == 'flow': + detail = self.describe_customflow(flow_def['crn']) + elif kind == 'readyFlow': + detail = self.describe_added_readyflow(flow_def['crn']) + else: + detail = None + self.sdk.throw_error(CdpError("DataFlow Definition type not supported %s" % kind)) + if version is None: + # versions are sorted descending by default + return detail['versions'][0]['crn'] + else: + out = [x for x in detail['versions'] if x['version'] == version] + if out: + return out[0]['crn'] + else: + self.sdk.throw_error(CdpError( + "Could not find version %d for DataFlow Definition named %s" % (version, flow_name) + )) + else: + self.sdk.throw_error(CdpError("More than one DataFlow Definition found for name %s" % flow_name)) else: - self.sdk.throw_error(CdpError("Could not resolve an Environment CRN from DataFlow CRN %s" % df_crn)) + self.sdk.throw_warning(CdpWarning("DataFlow Definition not found for name %s" % flow_name)) + + def resolve_env_crn_from_df_crn(self, df_crn): + if ':service:' in df_crn: + self.sdk.validate_crn(df_crn, 'df') + df_info = self.describe_service(df_crn=df_crn) + if df_info: + return df_info['environmentCrn'] + elif ':deployment:' in df_crn: + self.sdk.validate_crn(df_crn, 'deployment') + df_info = self.describe_deployment(df_crn) + if df_info: + return df_info['service']['environmentCrn'] + else: + self.sdk.throw_error( + CdpError( + "Could not resolve an Environment CRN from DataFlow CRN %s" % df_crn + ) + ) def create_deployment(self, df_crn, flow_ver_crn, deployment_name, size_name=None, static_node_count=None, autoscale_enabled=None, autoscale_nodes_min=None, autoscale_nodes_max=None, nifi_ver=None, - autostart_flow=None, parameters=None, kpis=None): + autostart_flow=None, parameter_groups=None, kpis=None): # Validations if size_name is not None and size_name not in self.DEPLOYMENT_SIZES: self.sdk.throw_error(CdpError("Deployment size_name %s not in supported size list: %s" @@ -136,9 +246,9 @@ def create_deployment(self, df_crn, flow_ver_crn, deployment_name, size_name=Non # Setup config = dict( autoStartFlow=autostart_flow if autostart_flow is not None else True, - parameterGroups=parameters, + parameterGroups=parameter_groups, deploymentName=deployment_name, - environmentCrn=self.resolve_environment_from_dataflow(df_crn), + environmentCrn=self.resolve_env_crn_from_df_crn(df_crn), clusterSizeName=size_name if size_name is not None else 'EXTRA_SMALL', cfmNifiVersion=nifi_ver, kpis=kpis @@ -175,7 +285,9 @@ def create_deployment(self, df_crn, flow_ver_crn, deployment_name, size_name=Non ) return resp - def terminate_deployment(self, env_crn, dep_crn): + def terminate_deployment(self, dep_crn, env_crn=None): + if env_crn is None: + env_crn = self.resolve_env_crn_from_df_crn(df_crn=dep_crn) _ = [self.sdk.validate_crn(x[0], x[1]) for x in [(env_crn, 'env'), (dep_crn, 'deployment')]] return self.sdk.call( svc='dfworkload', func='terminate_deployment', ret_field='deployment', From 5825e5d9db600413d6813db844587cb1f5495cd4 Mon Sep 17 00:00:00 2001 From: Daniel Chaffelson Date: Tue, 21 Dec 2021 18:11:09 +0000 Subject: [PATCH 3/4] Separate considerations for readyflows and customflows Add support for readyflows Remove comment support from df_service until string limitations are inline with rest of platform Add support for terminating deployed flows when disabling the DFX Service Signed-off-by: Daniel Chaffelson --- src/cdpy/common.py | 6 +++--- src/cdpy/df.py | 43 ++++++++++++++++++++++++++++++++++++++----- 2 files changed, 41 insertions(+), 8 deletions(-) diff --git a/src/cdpy/common.py b/src/cdpy/common.py index 731c5b5..1fe45ab 100644 --- a/src/cdpy/common.py +++ b/src/cdpy/common.py @@ -398,9 +398,9 @@ def validate_crn(self, obj: str, crn_type=None): if crn_type == 'deployment': check_strings += [':df:', ':deployment:'] for substring in check_strings: - if substring not in obj: - self.throw_error(CdpError("Supplied crn %s of proposed type %s is missing substring %s" - % (str(obj), crn_type, substring))) + if substring not in obj: + self.throw_error(CdpError("Supplied crn %s of proposed type %s is missing substring %s" + % (str(obj), crn_type, substring))) @staticmethod def sleep(seconds): diff --git a/src/cdpy/df.py b/src/cdpy/df.py index 0ac86c2..e25083b 100644 --- a/src/cdpy/df.py +++ b/src/cdpy/df.py @@ -66,7 +66,7 @@ def resolve_service_crn_from_name(self, name): def enable_service(self, env_crn: str, lb_ips: list = None, min_nodes: int = 3, max_nodes: int = 3, enable_public_ip: bool = True, kube_ips: list = None, cluster_subnets: list = None, - lb_subnets: list = None, tags: list = None): + lb_subnets: list = None, tags: dict = None): self.sdk.validate_crn(env_crn) return self.sdk.call( svc='df', func='enable_service', ret_field='service', @@ -157,18 +157,51 @@ def list_flow_definitions(self, name=None): result = [x for x in result if x['name'] == name] return result + def describe_readyflow(self, def_crn): + # Describes readyFlow not added to the Catalog + self.sdk.validate_crn(def_crn, 'readyflow') + return self.sdk.call( + svc='df', func='describe_readyflow', ret_field='readyflowDetail', squelch=[ + Squelch(value='NOT_FOUND', + warning='No ReadyFlow Definition with crn %s found' % def_crn) + ], + readyflowCrn=def_crn + ) + + def import_readyflow(self, def_crn): + # Imports a Readyflow from the Control Plane into the Tenant Flow Catalog + self.sdk.validate_crn(def_crn, 'readyflow') + return self.sdk.call( + svc='df', func='add_readyflow', ret_field='addedReadyflowDetail', squelch=[ + Squelch(value='NOT_FOUND', + warning='No ReadyFlow Definition with crn %s found' % def_crn) + ], + readyflowCrn=def_crn + ) + + def delete_added_readyflow(self, def_crn): + # Deletes an added Readyflow from the Tenant Flow Catalog + self.sdk.validate_crn(def_crn, 'readyflow') + return self.sdk.call( + svc='df', func='delete_added_readyflow', ret_field='readyflowDetail', squelch=[ + Squelch(value='NOT_FOUND', + warning='No ReadyFlow Definition with crn %s found' % def_crn) + ], + readyflowCrn=def_crn + ) + def describe_added_readyflow(self, def_crn, sort_versions=True): # Describes readyFlows added to the Catalog self.sdk.validate_crn(def_crn, 'readyflow') result = self.sdk.call( - svc='df', func='describe_added_readyflow', ret_field='readyflowDetail', squelch=[ + svc='df', func='describe_added_readyflow', ret_field='addedReadyflowDetail', squelch=[ Squelch(value='NOT_FOUND', warning='No ReadyFlow Definition with crn %s found' % def_crn) ], readyflowCrn=def_crn ) out = result - if sort_versions: + if sort_versions and out: out['versions'] = sorted(result['versions'], key=lambda d: d['version'], reverse=True) return out @@ -182,7 +215,7 @@ def describe_customflow(self, def_crn, sort_versions=True): flowCrn=def_crn ) out = result - if sort_versions: + if sort_versions and out: out['versions'] = sorted(result['versions'], key=lambda d: d['version'], reverse=True) return out @@ -240,7 +273,7 @@ def create_deployment(self, df_crn, flow_ver_crn, deployment_name, size_name=Non if size_name is not None and size_name not in self.DEPLOYMENT_SIZES: self.sdk.throw_error(CdpError("Deployment size_name %s not in supported size list: %s" % (size_name, str(self.DEPLOYMENT_SIZES)))) - _ = [self.sdk.validate_crn(x[0], x[1]) for x in [(df_crn, 'df'), (flow_ver_crn, 'flow')]] + self.sdk.validate_crn(df_crn, 'df') if self.list_deployments(name=deployment_name): self.sdk.throw_error(CdpError("Deployment already exists with conflicting name %s" % deployment_name)) # Setup From 70f5a578aa7176f25396830b11cf324163590928 Mon Sep 17 00:00:00 2001 From: Daniel Chaffelson Date: Thu, 24 Mar 2022 20:03:32 +0000 Subject: [PATCH 4/4] CDF Updates per reviews Improved application deployment playbook to only attempt Kafka flow deployment when Kafka Datahub is found Improved cdpy CRN validation by moving substring definitions to a constant and simplifying the validation logic Improved cdpy.df.describe_service to only run if the dataflow CRN is correctly resolved Force renamed readyflowCRN for an imported ReadyFlow to be addedReadyflowCrn to improve usability Renamed some tasks to improve distinction between similar but different activities like enabling the DF service vs deploying a DF flow Corrected various minor documentation points for DF modules Normalized responses to readyflow_info, deployment_info, and customflow_info to use listings of the full description of objects to simplify user experience Signed-off-by: Daniel Chaffelson --- src/cdpy/common.py | 27 ++++++++++++--------------- src/cdpy/df.py | 39 ++++++++++++++++++++++++--------------- 2 files changed, 36 insertions(+), 30 deletions(-) diff --git a/src/cdpy/common.py b/src/cdpy/common.py index 1fe45ab..44c0757 100644 --- a/src/cdpy/common.py +++ b/src/cdpy/common.py @@ -279,6 +279,16 @@ def _warning_format(message, category, filename, lineno, line=None): # Workload services with special credential and endpoint handling self.WORKLOAD_SERVICES = ['dfworkload'] + # substrings to check for in different CRNs + self.CRN_STRINGS = { + 'generic': ['crn:'], + 'env': [':environments:', ':environment:'], + 'df': [':df:', ':service:'], + 'flow': [':df:', ':flow:'], + 'readyflow': [':df:', 'readyFlow'], + 'deployment': [':df:', ':deployment:'] + } + def _make_user_agent_header(self): cdpy_version = pkg_resources.get_distribution('cdpy').version return '%s CDPY/%s CDPCLI/%s Python/%s %s/%s' % ( @@ -383,21 +393,8 @@ def _default_throw_warning(warning: 'CdpWarning'): def regex_search(pattern, obj): return re.search(pattern, obj) - def validate_crn(self, obj: str, crn_type=None): - # TODO: Rework with check_strings as pass through from module - check_strings = ['crn:'] - crn_type = crn_type if crn_type is not None else 'generic' - if crn_type == 'env': - check_strings += [':environments:', ':environment:'] - if crn_type == 'df': - check_strings += [':df:', ':service:'] - if crn_type == 'flow': - check_strings += [':df:', ':flow:'] - if crn_type == 'readyflow': - check_strings += [':df:', 'readyFlow'] - if crn_type == 'deployment': - check_strings += [':df:', ':deployment:'] - for substring in check_strings: + def validate_crn(self, obj: str, crn_type='generic'): + for substring in self.CRN_STRINGS[crn_type]: if substring not in obj: self.throw_error(CdpError("Supplied crn %s of proposed type %s is missing substring %s" % (str(obj), crn_type, substring))) diff --git a/src/cdpy/df.py b/src/cdpy/df.py index e25083b..55e942d 100644 --- a/src/cdpy/df.py +++ b/src/cdpy/df.py @@ -28,6 +28,7 @@ def list_services(self, only_enabled=False, env_crn=None, df_crn=None, name=None return result def describe_service(self, df_crn: str = None, env_crn: str = None): + resolved_df_crn = None if df_crn is not None: resolved_df_crn = df_crn elif env_crn is not None: @@ -37,24 +38,25 @@ def describe_service(self, df_crn: str = None, env_crn: str = None): elif len(services) == 1: resolved_df_crn = services[0]['crn'] else: - resolved_df_crn = None self.sdk.throw_error( CdpError('More than one DataFlow service found for env_crn, please try list instead') ) else: - resolved_df_crn = None self.sdk.throw_error(CdpError("Either df_crn or env_crn must be supplied to df.describe_service")) - return self.sdk.call( - svc='df', func='describe_service', ret_field='service', squelch=[ - Squelch(value='NOT_FOUND', - warning='No DataFlow Service with crn %s found' % df_crn), - Squelch(value='PERMISSION_DENIED') # DF GRPC sometimes returns 403 when finishing deletion - ], - serviceCrn=resolved_df_crn - ) + if resolved_df_crn is not None: + return self.sdk.call( + svc='df', func='describe_service', ret_field='service', squelch=[ + Squelch(value='NOT_FOUND', + warning='No DataFlow Service with crn %s found' % df_crn), + Squelch(value='PERMISSION_DENIED') # DF GRPC sometimes returns 403 when finishing deletion + ], + serviceCrn=resolved_df_crn + ) + else: + return None - def resolve_service_crn_from_name(self, name): - listing = self.list_services(only_enabled=True, name=name) + def resolve_service_crn_from_name(self, name, only_enabled=True): + listing = self.list_services(only_enabled=only_enabled, name=name) # More than one DF Service may exist with a given name if it was previously uncleanly deleted if len(listing) == 1: return listing[0]['crn'] @@ -90,7 +92,7 @@ def reset_service(self, df_crn: str): serviceCrn=df_crn ) - def list_deployments(self, env_crn=None, df_crn=None, name=None): + def list_deployments(self, env_crn=None, df_crn=None, name=None, dep_crn=None, described=False): result = self.sdk.call( svc='df', func='list_deployments', ret_field='deployments', squelch=[ Squelch(value='NOT_FOUND', default=list(), @@ -98,17 +100,22 @@ def list_deployments(self, env_crn=None, df_crn=None, name=None): ], pageSize=self.sdk.DEFAULT_PAGE_SIZE ) + if dep_crn is not None: + result = [x for x in result if x['crn'] == dep_crn] if name is not None: result = [x for x in result if x['name'] == name] if df_crn is not None: result = [x for x in result if x['service']['crn'] == df_crn] if env_crn is not None: result = [x for x in result if x['service']['environmentCrn'] == env_crn] - return result + if described is False: + return result + else: + return [self.describe_deployment(dep_crn=x['crn']) for x in result] def describe_deployment(self, dep_crn=None, df_crn=None, name=None): if dep_crn is not None: - self.sdk.validate_crn(dep_crn) + self.sdk.validate_crn(dep_crn, 'deployment') elif df_crn is not None and name is not None: deployments = self.list_deployments(df_crn=df_crn, name=name) if len(deployments) == 0: @@ -200,6 +207,8 @@ def describe_added_readyflow(self, def_crn, sort_versions=True): ], readyflowCrn=def_crn ) + # Force renaming readyflowCrn to addedReadyflowCrn to reduce user confusion + result['addedReadyflowCrn'] = result.pop('readyflowCrn') out = result if sort_versions and out: out['versions'] = sorted(result['versions'], key=lambda d: d['version'], reverse=True)