Skip to content

Commit d00221b

Browse files
authored
Fix issue 229 (#246)
* fixes #229 Found that importing a versioned flow was stripping out fields added in newer versions of NiFi. Also found that older versions of registry do not support these Fields and were also being silently stripped, implemented version checks and warnings for old registry or NiFi not supporting Parameters. * Bump version: 0.16.0 → 0.16.1
1 parent 39c0723 commit d00221b

File tree

8 files changed

+108
-36
lines changed

8 files changed

+108
-36
lines changed

nipyapi/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
__author__ = """Daniel Chaffelson"""
1111
__email__ = '[email protected]'
12-
__version__ = '0.16.0'
12+
__version__ = '0.16.1'
1313
__all__ = ['canvas', 'system', 'templates', 'config', 'nifi', 'registry',
1414
'versioning', 'demo', 'utils', 'security', 'parameters']
1515

nipyapi/utils.py

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@
2828
'wait_to_complete', 'is_endpoint_up', 'set_endpoint',
2929
'start_docker_containers', 'DockerContainer',
3030
'infer_object_label_from_class', 'bypass_slash_encoding',
31-
'exception_handler', 'enforce_min_ver', 'check_version'
31+
'exception_handler', 'enforce_min_ver', 'check_version',
32+
'validate_parameters_versioning_support'
3233
]
3334

3435
log = logging.getLogger(__name__)
@@ -465,7 +466,7 @@ def strip_snapshot(java_version):
465466

466467
def check_version(base, comparator=None, service='nifi'):
467468
"""
468-
Compares version 'a' against either version 'b', or the version of the
469+
Compares version base against either version comparator, or the version of the
469470
currently connected service instance.
470471
471472
Since NiFi is java, it may return a version with -SNAPSHOT as part of it.
@@ -478,20 +479,34 @@ def check_version(base, comparator=None, service='nifi'):
478479
service (str): The service to test the version against, currently
479480
only supports NiFi
480481
481-
Returns (int): -1 if a is lower, 0 if equal, and 1 if newer
482+
Returns (int): -1 if base is lower, 0 if equal, and 1 if newer than comparator
482483
483484
"""
484485
assert isinstance(base, six.string_types)
485486
assert comparator is None or isinstance(comparator, six.string_types)
486-
assert service == 'nifi'
487+
assert service in ['nifi', 'registry']
487488
# This call currently only supports NiFi
488489
ver_a = version.parse(base)
489490
if comparator:
490491
# if b is set, we compare the passed versions
491492
comparator = strip_snapshot(comparator)
492493
ver_b = version.parse(comparator)
494+
elif service == 'registry':
495+
try:
496+
reg_swagger_def = nipyapi.registry.ApiClient().call_api(
497+
'/swagger/swagger.json', 'GET', _preload_content=False,
498+
auth_settings=nipyapi.config.registry_config.enabled_auth
499+
)
500+
reg_json = load(reg_swagger_def[0].data)
501+
ver_b = version.parse(reg_json['info']['version'])
502+
except nipyapi.registry.rest.ApiException as e:
503+
if e.status == 404:
504+
log.warning("Unable to retrieve swagger.json from registry to check version, assuming older than 0.3")
505+
ver_b = version.parse('0.2.0')
506+
else:
507+
raise
493508
else:
494-
# if b not set, we compare a against the connected nifi instance
509+
# Working with NiFi
495510
ver_b = version.parse(strip_snapshot(nipyapi.system.get_nifi_version_info().ni_fi_version))
496511
if ver_b > ver_a:
497512
return -1
@@ -500,18 +515,25 @@ def check_version(base, comparator=None, service='nifi'):
500515
return 0
501516

502517

503-
def enforce_min_ver(min_version, bool_response=False):
518+
def validate_parameters_versioning_support():
519+
if enforce_min_ver('1.10', bool_response=True) or enforce_min_ver('0.6', service='registry', bool_response=True):
520+
log.warning("Connected NiFi Registry does not support Parameter Contexts and they will be lost in "
521+
"Version Control".format())
522+
523+
524+
def enforce_min_ver(min_version, bool_response=False, service='nifi'):
504525
"""
505526
Raises an error if target NiFi environment is not minimum version
506527
Args:
507528
min_version (str): Version to check against
508529
bool_response (bool): If True, will return True instead of
509530
raising error
531+
service: nifi or registry
510532
511533
Returns:
512534
(bool) or (NotImplementedError)
513535
"""
514-
if check_version(min_version) == 1:
536+
if check_version(min_version, service=service) == 1:
515537
if not bool_response:
516538
raise NotImplementedError(
517539
"This function is not available "

nipyapi/versioning.py

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -240,36 +240,25 @@ def save_flow_ver(process_group, registry_client, bucket, flow_name=None,
240240
target_pg = nipyapi.canvas.get_process_group(process_group.id, 'id')
241241
else:
242242
target_pg = process_group
243-
if nipyapi.utils.check_version('1.10.0') <= 0:
244-
body = nipyapi.nifi.StartVersionControlRequestEntity(
245-
process_group_revision=target_pg.revision,
246-
versioned_flow=nipyapi.nifi.VersionedFlowDTO(
243+
flow_dto = nipyapi.nifi.VersionedFlowDTO(
247244
bucket_id=bucket.identifier,
248245
comments=comment,
249246
description=desc,
250247
flow_name=flow_name,
251248
flow_id=flow_id,
252-
registry_id=registry_client.id,
253-
action='FORCE_COMMIT' if force else 'COMMIT'
249+
registry_id=registry_client.id
254250
)
255-
)
256-
else:
251+
if nipyapi.utils.check_version('1.10.0') <= 0:
257252
# no 'action' property in versions < 1.10
258-
body = nipyapi.nifi.StartVersionControlRequestEntity(
259-
process_group_revision=target_pg.revision,
260-
versioned_flow={
261-
'bucketId': bucket.identifier,
262-
'comments': comment,
263-
'description': desc,
264-
'flowName': flow_name,
265-
'flowId': flow_id,
266-
'registryId': registry_client.id
267-
}
268-
)
253+
flow_dto.action = 'FORCE_COMMIT' if force else 'COMMIT'
269254
with nipyapi.utils.rest_exceptions():
255+
nipyapi.utils.validate_parameters_versioning_support()
270256
return nipyapi.nifi.VersionsApi().save_to_flow_registry(
271257
id=target_pg.id,
272-
body=body
258+
body=nipyapi.nifi.StartVersionControlRequestEntity(
259+
process_group_revision=target_pg.revision,
260+
versioned_flow=flow_dto
261+
)
273262
)
274263

275264

@@ -523,13 +512,16 @@ def create_flow_version(flow, flow_snapshot, refresh=True):
523512
for obj in [target_bucket, target_flow]:
524513
for p in bad_params:
525514
obj.__setattr__(p, None)
515+
nipyapi.utils.validate_parameters_versioning_support()
526516
return nipyapi.registry.BucketFlowsApi().create_flow_version(
527517
bucket_id=target_bucket.identifier,
528518
flow_id=target_flow.identifier,
529519
body=nipyapi.registry.VersionedFlowSnapshot(
530520
flow=target_flow,
531521
bucket=target_bucket,
532522
flow_contents=flow_snapshot.flow_contents,
523+
parameter_contexts=flow_snapshot.parameter_contexts,
524+
external_controller_services=flow_snapshot.external_controller_services,
533525
snapshot_metadata=VfsMd(
534526
version=target_flow.version_count + 1,
535527
comments=flow_snapshot.snapshot_metadata.comments,
@@ -689,6 +681,7 @@ def import_flow_version(bucket_id, encoded_flow=None, file_path=None,
689681
" add this version to, or flow_name must be a unique "
690682
"name for a flow in this bucket, but not both")
691683
# Now write the new version
684+
nipyapi.utils.validate_parameters_versioning_support()
692685
return create_flow_version(
693686
flow=ver_flow,
694687
flow_snapshot=imported_flow,

resources/docker/tox-full/docker-compose.yml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ services:
3030
hostname: nifi
3131
ports:
3232
- "8080:8080"
33+
nifidev:
34+
image: apache/nifi:1.12.1
35+
container_name: nifidev
36+
hostname: nifidev
37+
ports:
38+
- "8081:8080"
3339
registry-010:
3440
image: apache/nifi-registry:0.1.0
3541
container_name: registry-010
@@ -54,3 +60,11 @@ services:
5460
- "18080:18080"
5561
environment:
5662
- NIFI_REGISTRY_WEB_HTTP_PORT=18080
63+
registrydev:
64+
image: apache/nifi-registry:0.7.0
65+
container_name: registrydev
66+
hostname: registrydev
67+
ports:
68+
- "18081:18081"
69+
environment:
70+
- NIFI_REGISTRY_WEB_HTTP_PORT=18081

setup.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[bumpversion]
2-
current_version = 0.16.0
2+
current_version = 0.16.1
33
commit = True
44
tag = True
55

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
with open('docs/history.rst') as history_file:
1212
history = history_file.read()
1313

14-
proj_version = '0.16.0'
14+
proj_version = '0.16.1'
1515

1616
with open('requirements.txt') as reqs_file:
1717
requirements = reqs_file.read().splitlines()

tests/conftest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -594,8 +594,8 @@ def fixture_flow_serde(request, tmpdir, fix_ver_flow):
594594
'FixtureFlowSerde',
595595
getattr(fix_ver_flow, '_fields') + ('filepath', 'json', 'yaml', 'raw')
596596
)
597-
f_filepath = str(tmpdir.mkdir(test_ver_export_tmpdir)\
598-
.join(test_ver_export_filename))
597+
f_filepath = str(tmpdir.mkdir(test_ver_export_tmpdir)
598+
.join(test_ver_export_filename))
599599
f_raw = nipyapi.versioning.get_flow_version(
600600
bucket_id=fix_ver_flow.bucket.identifier,
601601
flow_id=fix_ver_flow.flow.identifier,

tests/test_versioning.py

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import pytest
88
from deepdiff import DeepDiff
99
from tests import conftest
10-
from nipyapi import registry, nifi, versioning, canvas, utils, config
10+
from nipyapi import registry, nifi, versioning, canvas, utils, config, parameters
1111

1212

1313
def test_create_registry_client(regress_flow_reg):
@@ -139,7 +139,7 @@ def test_save_flow_ver(regress_flow_reg, fix_bucket, fix_pg, fix_proc):
139139
)
140140
assert isinstance(r2, nifi.VersionControlInformationEntity)
141141
assert r2.version_control_information.version > \
142-
r1.version_control_information.version
142+
r1.version_control_information.version
143143
with pytest.raises(ValueError):
144144
_ = versioning.save_flow_ver(
145145
process_group=f_pg,
@@ -188,7 +188,7 @@ def test_get_flow_in_bucket(regress_flow_reg, fix_ver_flow):
188188
'id'
189189
)
190190
assert isinstance(r1, registry.VersionedFlow)
191-
assert r1.identifier == fix_ver_flow.info.version_control_information.\
191+
assert r1.identifier == fix_ver_flow.info.version_control_information. \
192192
flow_id
193193
r2 = versioning.get_flow_in_bucket(fix_ver_flow.bucket.identifier,
194194
'fakenews', 'id')
@@ -394,10 +394,53 @@ def test_import_flow_version(regress_flow_reg, fix_flow_serde):
394394
) == {}
395395

396396

397+
def test_issue_229(regress_flow_reg, fix_bucket, fix_pg, fix_context):
398+
# test we can deploy and imported flow, issue 229
399+
if utils.enforce_min_ver('1.10.0', bool_response=True) or utils.enforce_min_ver('0.6.0', service='registry',
400+
bool_response=True):
401+
pass
402+
else:
403+
reg_client = conftest.ensure_registry_client(config.registry_local_name)
404+
bucket = fix_bucket()
405+
pg = fix_pg.generate()
406+
context = fix_context.generate()
407+
parameters.assign_context_to_process_group(pg, context.id)
408+
save_flow_ver = versioning.save_flow_ver(
409+
process_group=pg,
410+
registry_client=reg_client,
411+
bucket=bucket,
412+
flow_name=conftest.test_versioned_flow_name,
413+
comment='NiPyApi Test',
414+
desc='NiPyApi Test'
415+
)
416+
flow_raw = versioning.get_flow_version(
417+
bucket_id=bucket.identifier,
418+
flow_id=save_flow_ver.version_control_information.flow_id,
419+
export=True
420+
)
421+
# Check that it is being exported correctly
422+
# Older versions of Registry will drop unsupported parameterContext information
423+
if 'parameterContexts' in utils.load(flow_raw).keys():
424+
imported_flow = versioning.import_flow_version(
425+
bucket_id=bucket.identifier,
426+
encoded_flow=flow_raw,
427+
flow_name=conftest.test_versioned_flow_name + '_229'
428+
)
429+
deployed_flow = versioning.deploy_flow_version(
430+
parent_id=canvas.get_root_pg_id(),
431+
location=(0, 0),
432+
bucket_id=bucket.identifier,
433+
flow_id=imported_flow.flow.identifier,
434+
reg_client_id=reg_client.id,
435+
version=None
436+
)
437+
assert isinstance(deployed_flow, nifi.ProcessGroupEntity)
438+
439+
397440
def test_deploy_flow_version(regress_flow_reg, fix_ver_flow):
398441
r1 = versioning.deploy_flow_version(
399442
parent_id=canvas.get_root_pg_id(),
400-
location=(0,0),
443+
location=(0, 0),
401444
bucket_id=fix_ver_flow.bucket.identifier,
402445
flow_id=fix_ver_flow.flow.identifier,
403446
reg_client_id=fix_ver_flow.client.id,

0 commit comments

Comments
 (0)