Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion nipyapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

__author__ = """Daniel Chaffelson"""
__email__ = '[email protected]'
__version__ = '0.16.0'
__version__ = '0.16.1'
__all__ = ['canvas', 'system', 'templates', 'config', 'nifi', 'registry',
'versioning', 'demo', 'utils', 'security', 'parameters']

Expand Down
36 changes: 29 additions & 7 deletions nipyapi/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
'wait_to_complete', 'is_endpoint_up', 'set_endpoint',
'start_docker_containers', 'DockerContainer',
'infer_object_label_from_class', 'bypass_slash_encoding',
'exception_handler', 'enforce_min_ver', 'check_version'
'exception_handler', 'enforce_min_ver', 'check_version',
'validate_parameters_versioning_support'
]

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -465,7 +466,7 @@ def strip_snapshot(java_version):

def check_version(base, comparator=None, service='nifi'):
"""
Compares version 'a' against either version 'b', or the version of the
Compares version base against either version comparator, or the version of the
currently connected service instance.

Since NiFi is java, it may return a version with -SNAPSHOT as part of it.
Expand All @@ -478,20 +479,34 @@ def check_version(base, comparator=None, service='nifi'):
service (str): The service to test the version against, currently
only supports NiFi

Returns (int): -1 if a is lower, 0 if equal, and 1 if newer
Returns (int): -1 if base is lower, 0 if equal, and 1 if newer than comparator

"""
assert isinstance(base, six.string_types)
assert comparator is None or isinstance(comparator, six.string_types)
assert service == 'nifi'
assert service in ['nifi', 'registry']
# This call currently only supports NiFi
ver_a = version.parse(base)
if comparator:
# if b is set, we compare the passed versions
comparator = strip_snapshot(comparator)
ver_b = version.parse(comparator)
elif service == 'registry':
try:
reg_swagger_def = nipyapi.registry.ApiClient().call_api(
'/swagger/swagger.json', 'GET', _preload_content=False,
auth_settings=nipyapi.config.registry_config.enabled_auth
)
reg_json = load(reg_swagger_def[0].data)
ver_b = version.parse(reg_json['info']['version'])
except nipyapi.registry.rest.ApiException as e:
if e.status == 404:
log.warning("Unable to retrieve swagger.json from registry to check version, assuming older than 0.3")
ver_b = version.parse('0.2.0')
else:
raise
else:
# if b not set, we compare a against the connected nifi instance
# Working with NiFi
ver_b = version.parse(strip_snapshot(nipyapi.system.get_nifi_version_info().ni_fi_version))
if ver_b > ver_a:
return -1
Expand All @@ -500,18 +515,25 @@ def check_version(base, comparator=None, service='nifi'):
return 0


def enforce_min_ver(min_version, bool_response=False):
def validate_parameters_versioning_support():
if enforce_min_ver('1.10', bool_response=True) or enforce_min_ver('0.6', service='registry', bool_response=True):
log.warning("Connected NiFi Registry does not support Parameter Contexts and they will be lost in "
"Version Control".format())


def enforce_min_ver(min_version, bool_response=False, service='nifi'):
"""
Raises an error if target NiFi environment is not minimum version
Args:
min_version (str): Version to check against
bool_response (bool): If True, will return True instead of
raising error
service: nifi or registry

Returns:
(bool) or (NotImplementedError)
"""
if check_version(min_version) == 1:
if check_version(min_version, service=service) == 1:
if not bool_response:
raise NotImplementedError(
"This function is not available "
Expand Down
33 changes: 13 additions & 20 deletions nipyapi/versioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,36 +240,25 @@ def save_flow_ver(process_group, registry_client, bucket, flow_name=None,
target_pg = nipyapi.canvas.get_process_group(process_group.id, 'id')
else:
target_pg = process_group
if nipyapi.utils.check_version('1.10.0') <= 0:
body = nipyapi.nifi.StartVersionControlRequestEntity(
process_group_revision=target_pg.revision,
versioned_flow=nipyapi.nifi.VersionedFlowDTO(
flow_dto = nipyapi.nifi.VersionedFlowDTO(
bucket_id=bucket.identifier,
comments=comment,
description=desc,
flow_name=flow_name,
flow_id=flow_id,
registry_id=registry_client.id,
action='FORCE_COMMIT' if force else 'COMMIT'
registry_id=registry_client.id
)
)
else:
if nipyapi.utils.check_version('1.10.0') <= 0:
# no 'action' property in versions < 1.10
body = nipyapi.nifi.StartVersionControlRequestEntity(
process_group_revision=target_pg.revision,
versioned_flow={
'bucketId': bucket.identifier,
'comments': comment,
'description': desc,
'flowName': flow_name,
'flowId': flow_id,
'registryId': registry_client.id
}
)
flow_dto.action = 'FORCE_COMMIT' if force else 'COMMIT'
with nipyapi.utils.rest_exceptions():
nipyapi.utils.validate_parameters_versioning_support()
return nipyapi.nifi.VersionsApi().save_to_flow_registry(
id=target_pg.id,
body=body
body=nipyapi.nifi.StartVersionControlRequestEntity(
process_group_revision=target_pg.revision,
versioned_flow=flow_dto
)
)


Expand Down Expand Up @@ -523,13 +512,16 @@ def create_flow_version(flow, flow_snapshot, refresh=True):
for obj in [target_bucket, target_flow]:
for p in bad_params:
obj.__setattr__(p, None)
nipyapi.utils.validate_parameters_versioning_support()
return nipyapi.registry.BucketFlowsApi().create_flow_version(
bucket_id=target_bucket.identifier,
flow_id=target_flow.identifier,
body=nipyapi.registry.VersionedFlowSnapshot(
flow=target_flow,
bucket=target_bucket,
flow_contents=flow_snapshot.flow_contents,
parameter_contexts=flow_snapshot.parameter_contexts,
external_controller_services=flow_snapshot.external_controller_services,
snapshot_metadata=VfsMd(
version=target_flow.version_count + 1,
comments=flow_snapshot.snapshot_metadata.comments,
Expand Down Expand Up @@ -689,6 +681,7 @@ def import_flow_version(bucket_id, encoded_flow=None, file_path=None,
" add this version to, or flow_name must be a unique "
"name for a flow in this bucket, but not both")
# Now write the new version
nipyapi.utils.validate_parameters_versioning_support()
return create_flow_version(
flow=ver_flow,
flow_snapshot=imported_flow,
Expand Down
14 changes: 14 additions & 0 deletions resources/docker/tox-full/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ services:
hostname: nifi
ports:
- "8080:8080"
nifidev:
image: apache/nifi:1.12.1
container_name: nifidev
hostname: nifidev
ports:
- "8081:8080"
registry-010:
image: apache/nifi-registry:0.1.0
container_name: registry-010
Expand All @@ -54,3 +60,11 @@ services:
- "18080:18080"
environment:
- NIFI_REGISTRY_WEB_HTTP_PORT=18080
registrydev:
image: apache/nifi-registry:0.7.0
container_name: registrydev
hostname: registrydev
ports:
- "18081:18081"
environment:
- NIFI_REGISTRY_WEB_HTTP_PORT=18081
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.16.0
current_version = 0.16.1
commit = True
tag = True

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
with open('docs/history.rst') as history_file:
history = history_file.read()

proj_version = '0.16.0'
proj_version = '0.16.1'

with open('requirements.txt') as reqs_file:
requirements = reqs_file.read().splitlines()
Expand Down
4 changes: 2 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,8 +594,8 @@ def fixture_flow_serde(request, tmpdir, fix_ver_flow):
'FixtureFlowSerde',
getattr(fix_ver_flow, '_fields') + ('filepath', 'json', 'yaml', 'raw')
)
f_filepath = str(tmpdir.mkdir(test_ver_export_tmpdir)\
.join(test_ver_export_filename))
f_filepath = str(tmpdir.mkdir(test_ver_export_tmpdir)
.join(test_ver_export_filename))
f_raw = nipyapi.versioning.get_flow_version(
bucket_id=fix_ver_flow.bucket.identifier,
flow_id=fix_ver_flow.flow.identifier,
Expand Down
51 changes: 47 additions & 4 deletions tests/test_versioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pytest
from deepdiff import DeepDiff
from tests import conftest
from nipyapi import registry, nifi, versioning, canvas, utils, config
from nipyapi import registry, nifi, versioning, canvas, utils, config, parameters


def test_create_registry_client(regress_flow_reg):
Expand Down Expand Up @@ -139,7 +139,7 @@ def test_save_flow_ver(regress_flow_reg, fix_bucket, fix_pg, fix_proc):
)
assert isinstance(r2, nifi.VersionControlInformationEntity)
assert r2.version_control_information.version > \
r1.version_control_information.version
r1.version_control_information.version
with pytest.raises(ValueError):
_ = versioning.save_flow_ver(
process_group=f_pg,
Expand Down Expand Up @@ -188,7 +188,7 @@ def test_get_flow_in_bucket(regress_flow_reg, fix_ver_flow):
'id'
)
assert isinstance(r1, registry.VersionedFlow)
assert r1.identifier == fix_ver_flow.info.version_control_information.\
assert r1.identifier == fix_ver_flow.info.version_control_information. \
flow_id
r2 = versioning.get_flow_in_bucket(fix_ver_flow.bucket.identifier,
'fakenews', 'id')
Expand Down Expand Up @@ -394,10 +394,53 @@ def test_import_flow_version(regress_flow_reg, fix_flow_serde):
) == {}


def test_issue_229(regress_flow_reg, fix_bucket, fix_pg, fix_context):
# test we can deploy and imported flow, issue 229
if utils.enforce_min_ver('1.10.0', bool_response=True) or utils.enforce_min_ver('0.6.0', service='registry',
bool_response=True):
pass
else:
reg_client = conftest.ensure_registry_client(config.registry_local_name)
bucket = fix_bucket()
pg = fix_pg.generate()
context = fix_context.generate()
parameters.assign_context_to_process_group(pg, context.id)
save_flow_ver = versioning.save_flow_ver(
process_group=pg,
registry_client=reg_client,
bucket=bucket,
flow_name=conftest.test_versioned_flow_name,
comment='NiPyApi Test',
desc='NiPyApi Test'
)
flow_raw = versioning.get_flow_version(
bucket_id=bucket.identifier,
flow_id=save_flow_ver.version_control_information.flow_id,
export=True
)
# Check that it is being exported correctly
# Older versions of Registry will drop unsupported parameterContext information
if 'parameterContexts' in utils.load(flow_raw).keys():
imported_flow = versioning.import_flow_version(
bucket_id=bucket.identifier,
encoded_flow=flow_raw,
flow_name=conftest.test_versioned_flow_name + '_229'
)
deployed_flow = versioning.deploy_flow_version(
parent_id=canvas.get_root_pg_id(),
location=(0, 0),
bucket_id=bucket.identifier,
flow_id=imported_flow.flow.identifier,
reg_client_id=reg_client.id,
version=None
)
assert isinstance(deployed_flow, nifi.ProcessGroupEntity)


def test_deploy_flow_version(regress_flow_reg, fix_ver_flow):
r1 = versioning.deploy_flow_version(
parent_id=canvas.get_root_pg_id(),
location=(0,0),
location=(0, 0),
bucket_id=fix_ver_flow.bucket.identifier,
flow_id=fix_ver_flow.flow.identifier,
reg_client_id=fix_ver_flow.client.id,
Expand Down