diff --git a/nipyapi/__init__.py b/nipyapi/__init__.py index e6c39c3f..a84f9830 100644 --- a/nipyapi/__init__.py +++ b/nipyapi/__init__.py @@ -9,7 +9,7 @@ __author__ = """Daniel Chaffelson""" __email__ = 'chaffelson@gmail.com' -__version__ = '0.16.0' +__version__ = '0.16.1' __all__ = ['canvas', 'system', 'templates', 'config', 'nifi', 'registry', 'versioning', 'demo', 'utils', 'security', 'parameters'] diff --git a/nipyapi/utils.py b/nipyapi/utils.py index 089c4777..49cbc84b 100644 --- a/nipyapi/utils.py +++ b/nipyapi/utils.py @@ -28,7 +28,8 @@ 'wait_to_complete', 'is_endpoint_up', 'set_endpoint', 'start_docker_containers', 'DockerContainer', 'infer_object_label_from_class', 'bypass_slash_encoding', - 'exception_handler', 'enforce_min_ver', 'check_version' + 'exception_handler', 'enforce_min_ver', 'check_version', + 'validate_parameters_versioning_support' ] log = logging.getLogger(__name__) @@ -465,7 +466,7 @@ def strip_snapshot(java_version): def check_version(base, comparator=None, service='nifi'): """ - Compares version 'a' against either version 'b', or the version of the + Compares version base against either version comparator, or the version of the currently connected service instance. Since NiFi is java, it may return a version with -SNAPSHOT as part of it. @@ -478,20 +479,34 @@ def check_version(base, comparator=None, service='nifi'): service (str): The service to test the version against, currently only supports NiFi - Returns (int): -1 if a is lower, 0 if equal, and 1 if newer + Returns (int): -1 if base is lower, 0 if equal, and 1 if newer than comparator """ assert isinstance(base, six.string_types) assert comparator is None or isinstance(comparator, six.string_types) - assert service == 'nifi' + assert service in ['nifi', 'registry'] # This call currently only supports NiFi ver_a = version.parse(base) if comparator: # if b is set, we compare the passed versions comparator = strip_snapshot(comparator) ver_b = version.parse(comparator) + elif service == 'registry': + try: + reg_swagger_def = nipyapi.registry.ApiClient().call_api( + '/swagger/swagger.json', 'GET', _preload_content=False, + auth_settings=nipyapi.config.registry_config.enabled_auth + ) + reg_json = load(reg_swagger_def[0].data) + ver_b = version.parse(reg_json['info']['version']) + except nipyapi.registry.rest.ApiException as e: + if e.status == 404: + log.warning("Unable to retrieve swagger.json from registry to check version, assuming older than 0.3") + ver_b = version.parse('0.2.0') + else: + raise else: - # if b not set, we compare a against the connected nifi instance + # Working with NiFi ver_b = version.parse(strip_snapshot(nipyapi.system.get_nifi_version_info().ni_fi_version)) if ver_b > ver_a: return -1 @@ -500,18 +515,25 @@ def check_version(base, comparator=None, service='nifi'): return 0 -def enforce_min_ver(min_version, bool_response=False): +def validate_parameters_versioning_support(): + if enforce_min_ver('1.10', bool_response=True) or enforce_min_ver('0.6', service='registry', bool_response=True): + log.warning("Connected NiFi Registry does not support Parameter Contexts and they will be lost in " + "Version Control".format()) + + +def enforce_min_ver(min_version, bool_response=False, service='nifi'): """ Raises an error if target NiFi environment is not minimum version Args: min_version (str): Version to check against bool_response (bool): If True, will return True instead of raising error + service: nifi or registry Returns: (bool) or (NotImplementedError) """ - if check_version(min_version) == 1: + if check_version(min_version, service=service) == 1: if not bool_response: raise NotImplementedError( "This function is not available " diff --git a/nipyapi/versioning.py b/nipyapi/versioning.py index da8a264d..0addbee5 100644 --- a/nipyapi/versioning.py +++ b/nipyapi/versioning.py @@ -240,36 +240,25 @@ def save_flow_ver(process_group, registry_client, bucket, flow_name=None, target_pg = nipyapi.canvas.get_process_group(process_group.id, 'id') else: target_pg = process_group - if nipyapi.utils.check_version('1.10.0') <= 0: - body = nipyapi.nifi.StartVersionControlRequestEntity( - process_group_revision=target_pg.revision, - versioned_flow=nipyapi.nifi.VersionedFlowDTO( + flow_dto = nipyapi.nifi.VersionedFlowDTO( bucket_id=bucket.identifier, comments=comment, description=desc, flow_name=flow_name, flow_id=flow_id, - registry_id=registry_client.id, - action='FORCE_COMMIT' if force else 'COMMIT' + registry_id=registry_client.id ) - ) - else: + if nipyapi.utils.check_version('1.10.0') <= 0: # no 'action' property in versions < 1.10 - body = nipyapi.nifi.StartVersionControlRequestEntity( - process_group_revision=target_pg.revision, - versioned_flow={ - 'bucketId': bucket.identifier, - 'comments': comment, - 'description': desc, - 'flowName': flow_name, - 'flowId': flow_id, - 'registryId': registry_client.id - } - ) + flow_dto.action = 'FORCE_COMMIT' if force else 'COMMIT' with nipyapi.utils.rest_exceptions(): + nipyapi.utils.validate_parameters_versioning_support() return nipyapi.nifi.VersionsApi().save_to_flow_registry( id=target_pg.id, - body=body + body=nipyapi.nifi.StartVersionControlRequestEntity( + process_group_revision=target_pg.revision, + versioned_flow=flow_dto + ) ) @@ -523,6 +512,7 @@ def create_flow_version(flow, flow_snapshot, refresh=True): for obj in [target_bucket, target_flow]: for p in bad_params: obj.__setattr__(p, None) + nipyapi.utils.validate_parameters_versioning_support() return nipyapi.registry.BucketFlowsApi().create_flow_version( bucket_id=target_bucket.identifier, flow_id=target_flow.identifier, @@ -530,6 +520,8 @@ def create_flow_version(flow, flow_snapshot, refresh=True): flow=target_flow, bucket=target_bucket, flow_contents=flow_snapshot.flow_contents, + parameter_contexts=flow_snapshot.parameter_contexts, + external_controller_services=flow_snapshot.external_controller_services, snapshot_metadata=VfsMd( version=target_flow.version_count + 1, comments=flow_snapshot.snapshot_metadata.comments, @@ -689,6 +681,7 @@ def import_flow_version(bucket_id, encoded_flow=None, file_path=None, " add this version to, or flow_name must be a unique " "name for a flow in this bucket, but not both") # Now write the new version + nipyapi.utils.validate_parameters_versioning_support() return create_flow_version( flow=ver_flow, flow_snapshot=imported_flow, diff --git a/resources/docker/tox-full/docker-compose.yml b/resources/docker/tox-full/docker-compose.yml index 2c60fc41..6dda53be 100644 --- a/resources/docker/tox-full/docker-compose.yml +++ b/resources/docker/tox-full/docker-compose.yml @@ -30,6 +30,12 @@ services: hostname: nifi ports: - "8080:8080" + nifidev: + image: apache/nifi:1.12.1 + container_name: nifidev + hostname: nifidev + ports: + - "8081:8080" registry-010: image: apache/nifi-registry:0.1.0 container_name: registry-010 @@ -54,3 +60,11 @@ services: - "18080:18080" environment: - NIFI_REGISTRY_WEB_HTTP_PORT=18080 + registrydev: + image: apache/nifi-registry:0.7.0 + container_name: registrydev + hostname: registrydev + ports: + - "18081:18081" + environment: + - NIFI_REGISTRY_WEB_HTTP_PORT=18081 diff --git a/setup.cfg b/setup.cfg index 9ade118d..f9d4d16a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.16.0 +current_version = 0.16.1 commit = True tag = True diff --git a/setup.py b/setup.py index cd5e7472..2fd00483 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ with open('docs/history.rst') as history_file: history = history_file.read() -proj_version = '0.16.0' +proj_version = '0.16.1' with open('requirements.txt') as reqs_file: requirements = reqs_file.read().splitlines() diff --git a/tests/conftest.py b/tests/conftest.py index d6b466ed..1588a4c8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -594,8 +594,8 @@ def fixture_flow_serde(request, tmpdir, fix_ver_flow): 'FixtureFlowSerde', getattr(fix_ver_flow, '_fields') + ('filepath', 'json', 'yaml', 'raw') ) - f_filepath = str(tmpdir.mkdir(test_ver_export_tmpdir)\ - .join(test_ver_export_filename)) + f_filepath = str(tmpdir.mkdir(test_ver_export_tmpdir) + .join(test_ver_export_filename)) f_raw = nipyapi.versioning.get_flow_version( bucket_id=fix_ver_flow.bucket.identifier, flow_id=fix_ver_flow.flow.identifier, diff --git a/tests/test_versioning.py b/tests/test_versioning.py index 02a8b215..a950de0b 100644 --- a/tests/test_versioning.py +++ b/tests/test_versioning.py @@ -7,7 +7,7 @@ import pytest from deepdiff import DeepDiff from tests import conftest -from nipyapi import registry, nifi, versioning, canvas, utils, config +from nipyapi import registry, nifi, versioning, canvas, utils, config, parameters def test_create_registry_client(regress_flow_reg): @@ -139,7 +139,7 @@ def test_save_flow_ver(regress_flow_reg, fix_bucket, fix_pg, fix_proc): ) assert isinstance(r2, nifi.VersionControlInformationEntity) assert r2.version_control_information.version > \ - r1.version_control_information.version + r1.version_control_information.version with pytest.raises(ValueError): _ = versioning.save_flow_ver( process_group=f_pg, @@ -188,7 +188,7 @@ def test_get_flow_in_bucket(regress_flow_reg, fix_ver_flow): 'id' ) assert isinstance(r1, registry.VersionedFlow) - assert r1.identifier == fix_ver_flow.info.version_control_information.\ + assert r1.identifier == fix_ver_flow.info.version_control_information. \ flow_id r2 = versioning.get_flow_in_bucket(fix_ver_flow.bucket.identifier, 'fakenews', 'id') @@ -394,10 +394,53 @@ def test_import_flow_version(regress_flow_reg, fix_flow_serde): ) == {} +def test_issue_229(regress_flow_reg, fix_bucket, fix_pg, fix_context): + # test we can deploy and imported flow, issue 229 + if utils.enforce_min_ver('1.10.0', bool_response=True) or utils.enforce_min_ver('0.6.0', service='registry', + bool_response=True): + pass + else: + reg_client = conftest.ensure_registry_client(config.registry_local_name) + bucket = fix_bucket() + pg = fix_pg.generate() + context = fix_context.generate() + parameters.assign_context_to_process_group(pg, context.id) + save_flow_ver = versioning.save_flow_ver( + process_group=pg, + registry_client=reg_client, + bucket=bucket, + flow_name=conftest.test_versioned_flow_name, + comment='NiPyApi Test', + desc='NiPyApi Test' + ) + flow_raw = versioning.get_flow_version( + bucket_id=bucket.identifier, + flow_id=save_flow_ver.version_control_information.flow_id, + export=True + ) + # Check that it is being exported correctly + # Older versions of Registry will drop unsupported parameterContext information + if 'parameterContexts' in utils.load(flow_raw).keys(): + imported_flow = versioning.import_flow_version( + bucket_id=bucket.identifier, + encoded_flow=flow_raw, + flow_name=conftest.test_versioned_flow_name + '_229' + ) + deployed_flow = versioning.deploy_flow_version( + parent_id=canvas.get_root_pg_id(), + location=(0, 0), + bucket_id=bucket.identifier, + flow_id=imported_flow.flow.identifier, + reg_client_id=reg_client.id, + version=None + ) + assert isinstance(deployed_flow, nifi.ProcessGroupEntity) + + def test_deploy_flow_version(regress_flow_reg, fix_ver_flow): r1 = versioning.deploy_flow_version( parent_id=canvas.get_root_pg_id(), - location=(0,0), + location=(0, 0), bucket_id=fix_ver_flow.bucket.identifier, flow_id=fix_ver_flow.flow.identifier, reg_client_id=fix_ver_flow.client.id,