From ef33aa9fce5669dd654a74f0b7920512658609f9 Mon Sep 17 00:00:00 2001 From: Arthur Date: Mon, 20 Mar 2023 13:09:11 -0700 Subject: [PATCH 01/16] 11617 validate csv column headers --- netbox/utilities/forms/forms.py | 1 + 1 file changed, 1 insertion(+) diff --git a/netbox/utilities/forms/forms.py b/netbox/utilities/forms/forms.py index 9f84e100f67..6d593cbf55a 100644 --- a/netbox/utilities/forms/forms.py +++ b/netbox/utilities/forms/forms.py @@ -2,6 +2,7 @@ from django import forms from django.utils.translation import gettext as _ + from .mixins import BootstrapMixin __all__ = ( From 18b98e641c19be187a01cf991a0e5cf35a754028 Mon Sep 17 00:00:00 2001 From: Arthur Date: Mon, 24 Apr 2023 12:57:34 -0700 Subject: [PATCH 02/16] 11617 validate bulk import fields --- netbox/netbox/views/generic/bulk_views.py | 9 +++++++++ netbox/utilities/forms/utils.py | 4 ++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/netbox/netbox/views/generic/bulk_views.py b/netbox/netbox/views/generic/bulk_views.py index 35caa31b362..ae17aca5f34 100644 --- a/netbox/netbox/views/generic/bulk_views.py +++ b/netbox/netbox/views/generic/bulk_views.py @@ -21,6 +21,7 @@ from utilities.exceptions import AbortRequest, AbortTransaction, PermissionsViolation from utilities.forms import BulkRenameForm, ConfirmationForm, restrict_form_fields from utilities.forms.bulk_import import BulkImportForm +from utilities.forms.utils import validate_import_headers from utilities.htmx import is_embedded, is_htmx from utilities.permissions import get_permission_for_model from utilities.utils import get_viewname @@ -395,6 +396,14 @@ def create_and_update_objects(self, form, request): model_form_kwargs['headers'] = form._csv_headers # Add CSV headers model_form = self.model_form(**model_form_kwargs) + # validate the fields (required fields are present and no unknown fields) + form_fields = model_form.fields + required_fields = [ + name for name, field in form_fields.items() if field.required + ] + headers = list(record.keys()) + validate_import_headers(headers, form_fields, required_fields) + # When updating, omit all form fields other than those specified in the record. (No # fields are required when modifying an existing object.) if object_id: diff --git a/netbox/utilities/forms/utils.py b/netbox/utilities/forms/utils.py index 4d737f16321..5f2c869994c 100644 --- a/netbox/utilities/forms/utils.py +++ b/netbox/utilities/forms/utils.py @@ -18,7 +18,7 @@ 'parse_numeric_range', 'restrict_form_fields', 'parse_csv', - 'validate_csv', + 'validate_import_headers', ) @@ -242,7 +242,7 @@ def parse_csv(reader): return headers, records -def validate_csv(headers, fields, required_fields): +def validate_import_headers(headers, fields, required_fields): """ Validate that parsed csv data conforms to the object's available fields. Raise validation errors if parsed csv data contains invalid headers or does not contain required headers. From 702cfc1fac2ca55528dc0d7a30c5aa87595c1559 Mon Sep 17 00:00:00 2001 From: Arthur Date: Mon, 24 Apr 2023 13:09:31 -0700 Subject: [PATCH 03/16] 11617 validate bulk import fields --- netbox/netbox/views/generic/bulk_views.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/netbox/netbox/views/generic/bulk_views.py b/netbox/netbox/views/generic/bulk_views.py index ae17aca5f34..ce2d79439c5 100644 --- a/netbox/netbox/views/generic/bulk_views.py +++ b/netbox/netbox/views/generic/bulk_views.py @@ -22,7 +22,8 @@ from utilities.forms import BulkRenameForm, ConfirmationForm, restrict_form_fields from utilities.forms.bulk_import import BulkImportForm from utilities.forms.utils import validate_import_headers -from utilities.htmx import is_embedded, is_htmx +from utilities.htmx import is_embedded +from utilities.htmx import is_htmx from utilities.permissions import get_permission_for_model from utilities.utils import get_viewname from utilities.views import GetReturnURLMixin From f7b3dad29cc9134300064541471309f28e2b44a2 Mon Sep 17 00:00:00 2001 From: Arthur Date: Tue, 25 Apr 2023 08:07:09 -0700 Subject: [PATCH 04/16] 11617 update header processing for related fields --- netbox/netbox/views/generic/bulk_views.py | 9 ++++-- netbox/utilities/forms/utils.py | 35 ++++++++++++++--------- 2 files changed, 28 insertions(+), 16 deletions(-) diff --git a/netbox/netbox/views/generic/bulk_views.py b/netbox/netbox/views/generic/bulk_views.py index ce2d79439c5..0d670eb7729 100644 --- a/netbox/netbox/views/generic/bulk_views.py +++ b/netbox/netbox/views/generic/bulk_views.py @@ -21,7 +21,7 @@ from utilities.exceptions import AbortRequest, AbortTransaction, PermissionsViolation from utilities.forms import BulkRenameForm, ConfirmationForm, restrict_form_fields from utilities.forms.bulk_import import BulkImportForm -from utilities.forms.utils import validate_import_headers +from utilities.forms.utils import headers_to_dict, validate_import_headers from utilities.htmx import is_embedded from utilities.htmx import is_htmx from utilities.permissions import get_permission_for_model @@ -393,8 +393,10 @@ def create_and_update_objects(self, form, request): 'data': record, 'instance': instance, } + headers = None if hasattr(form, '_csv_headers'): - model_form_kwargs['headers'] = form._csv_headers # Add CSV headers + headers = form._csv_headers + model_form_kwargs['headers'] = headers # Add CSV headers model_form = self.model_form(**model_form_kwargs) # validate the fields (required fields are present and no unknown fields) @@ -402,7 +404,8 @@ def create_and_update_objects(self, form, request): required_fields = [ name for name, field in form_fields.items() if field.required ] - headers = list(record.keys()) + if not headers: + headers = headers_to_dict(list(record.keys())) validate_import_headers(headers, form_fields, required_fields) # When updating, omit all form fields other than those specified in the record. (No diff --git a/netbox/utilities/forms/utils.py b/netbox/utilities/forms/utils.py index 5f2c869994c..d92635e0287 100644 --- a/netbox/utilities/forms/utils.py +++ b/netbox/utilities/forms/utils.py @@ -205,29 +205,38 @@ def restrict_form_fields(form, user, action='view'): field.queryset = field.queryset.restrict(user, action) -def parse_csv(reader): +def headers_to_dict(headers): """ - Parse a csv_reader object into a headers dictionary and a list of records dictionaries. Raise an error - if the records are formatted incorrectly. Return headers and records as a tuple. + Create a dictionary mapping each header to an optional "to" field specifying how + the related object is being referenced. For example, importing a Device might use a + `site.slug` header, to indicate the related site is being referenced by its slug. """ - records = [] - headers = {} - - # Consume the first line of CSV data as column headers. Create a dictionary mapping each header to an optional - # "to" field specifying how the related object is being referenced. For example, importing a Device might use a - # `site.slug` header, to indicate the related site is being referenced by its slug. - - for header in next(reader): + header_dict = {} + for header in headers: header = header.strip() if '.' in header: field, to_field = header.split('.', 1) if field in headers: raise forms.ValidationError(f'Duplicate or conflicting column header for "{field}"') - headers[field] = to_field + header_dict[field] = to_field else: if header in headers: raise forms.ValidationError(f'Duplicate or conflicting column header for "{header}"') - headers[header] = None + header_dict[header] = None + + return header_dict + + +def parse_csv(reader): + """ + Parse a csv_reader object into a headers dictionary and a list of records dictionaries. Raise an error + if the records are formatted incorrectly. Return headers and records as a tuple. + """ + records = [] + headers = {} + + # Consume the first line of CSV data as column headers. + headers = headers_to_dict(list(reader)) # Parse CSV rows into a list of dictionaries mapped from the column headers. for i, row in enumerate(reader, start=1): From a9f62c2e6e27008abe77db779d70991595e58bbe Mon Sep 17 00:00:00 2001 From: Arthur Date: Tue, 25 Apr 2023 08:26:16 -0700 Subject: [PATCH 05/16] 11617 update header processing for related fields --- netbox/utilities/forms/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/netbox/utilities/forms/utils.py b/netbox/utilities/forms/utils.py index d92635e0287..996d8519563 100644 --- a/netbox/utilities/forms/utils.py +++ b/netbox/utilities/forms/utils.py @@ -236,7 +236,7 @@ def parse_csv(reader): headers = {} # Consume the first line of CSV data as column headers. - headers = headers_to_dict(list(reader)) + headers = headers_to_dict(list(next(reader))) # Parse CSV rows into a list of dictionaries mapped from the column headers. for i, row in enumerate(reader, start=1): From f46ac8b06a5dc8bbd42713f1bd3220709aa70906 Mon Sep 17 00:00:00 2001 From: Arthur Date: Tue, 25 Apr 2023 08:40:55 -0700 Subject: [PATCH 06/16] 11617 update header processing for related fields --- netbox/utilities/forms/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/netbox/utilities/forms/utils.py b/netbox/utilities/forms/utils.py index 996d8519563..b4ad2fab28b 100644 --- a/netbox/utilities/forms/utils.py +++ b/netbox/utilities/forms/utils.py @@ -220,7 +220,7 @@ def headers_to_dict(headers): raise forms.ValidationError(f'Duplicate or conflicting column header for "{field}"') header_dict[field] = to_field else: - if header in headers: + if header in header_dict: raise forms.ValidationError(f'Duplicate or conflicting column header for "{header}"') header_dict[header] = None From b415c157cce81ac244053961a0bfec021dbf1689 Mon Sep 17 00:00:00 2001 From: Arthur Date: Tue, 25 Apr 2023 08:53:12 -0700 Subject: [PATCH 07/16] 11617 fix header field dupe test --- netbox/utilities/forms/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/netbox/utilities/forms/utils.py b/netbox/utilities/forms/utils.py index b4ad2fab28b..1b308899819 100644 --- a/netbox/utilities/forms/utils.py +++ b/netbox/utilities/forms/utils.py @@ -216,7 +216,7 @@ def headers_to_dict(headers): header = header.strip() if '.' in header: field, to_field = header.split('.', 1) - if field in headers: + if field in header_dict: raise forms.ValidationError(f'Duplicate or conflicting column header for "{field}"') header_dict[field] = to_field else: From b8d7c8bc0d5c99f2d4e950c0eeedd38240633f52 Mon Sep 17 00:00:00 2001 From: Arthur Date: Tue, 25 Apr 2023 11:43:59 -0700 Subject: [PATCH 08/16] 11617 special case DeviceType and ModuleType --- netbox/netbox/views/generic/bulk_views.py | 3 ++- netbox/utilities/forms/utils.py | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/netbox/netbox/views/generic/bulk_views.py b/netbox/netbox/views/generic/bulk_views.py index 0d670eb7729..0e7bfdcd785 100644 --- a/netbox/netbox/views/generic/bulk_views.py +++ b/netbox/netbox/views/generic/bulk_views.py @@ -15,6 +15,7 @@ from django.utils.safestring import mark_safe from django_tables2.export import TableExport +from dcim.forms.bulk_import import DeviceTypeImportForm, ModuleTypeImportForm from extras.models import ExportTemplate from extras.signals import clear_webhooks from utilities.error_handlers import handle_protectederror @@ -406,7 +407,7 @@ def create_and_update_objects(self, form, request): ] if not headers: headers = headers_to_dict(list(record.keys())) - validate_import_headers(headers, form_fields, required_fields) + validate_import_headers(headers, form_fields, required_fields, allow_extra_columns=isinstance(model_form, (DeviceTypeImportForm, ModuleTypeImportForm))) # When updating, omit all form fields other than those specified in the record. (No # fields are required when modifying an existing object.) diff --git a/netbox/utilities/forms/utils.py b/netbox/utilities/forms/utils.py index 1b308899819..eb880c4fedb 100644 --- a/netbox/utilities/forms/utils.py +++ b/netbox/utilities/forms/utils.py @@ -251,7 +251,7 @@ def parse_csv(reader): return headers, records -def validate_import_headers(headers, fields, required_fields): +def validate_import_headers(headers, fields, required_fields, allow_extra_columns=False): """ Validate that parsed csv data conforms to the object's available fields. Raise validation errors if parsed csv data contains invalid headers or does not contain required headers. @@ -262,7 +262,7 @@ def validate_import_headers(headers, fields, required_fields): if field == "id": is_update = True continue - if field not in fields: + if (not allow_extra_columns) and (field not in fields): raise forms.ValidationError(f'Unexpected column header "{field}" found.') if to_field and not hasattr(fields[field], 'to_field_name'): raise forms.ValidationError(f'Column "{field}" is not a related object; cannot use dots') From 5346583ccccc73aea4bce702e6d26ae33ce57007 Mon Sep 17 00:00:00 2001 From: Arthur Date: Tue, 25 Apr 2023 13:04:58 -0700 Subject: [PATCH 09/16] 11617 add tests for yaml, json bulk import --- netbox/utilities/testing/views.py | 251 +++++++++++++++++++++++++++++- 1 file changed, 244 insertions(+), 7 deletions(-) diff --git a/netbox/utilities/testing/views.py b/netbox/utilities/testing/views.py index dc17548a265..e2a703baa03 100644 --- a/netbox/utilities/testing/views.py +++ b/netbox/utilities/testing/views.py @@ -1,4 +1,7 @@ import csv +import io +import json +import yaml from django.contrib.contenttypes.models import ContentType from django.core.exceptions import ObjectDoesNotExist @@ -553,10 +556,29 @@ class BulkImportObjectsViewTestCase(ModelViewTestCase): def _get_csv_data(self): return '\n'.join(self.csv_data) + def _get_yaml_data(self): + data = [*csv.DictReader(io.StringIO(self._get_csv_data()))] + return yaml.dump(data) + + def _get_json_data(self): + data = [*csv.DictReader(io.StringIO(self._get_csv_data()))] + return json.dumps(data) + def _get_update_csv_data(self): - return self.csv_update_data, '\n'.join(self.csv_update_data) + return '\n'.join(self.csv_update_data) + + def _get_update_yaml_data(self): + data = [*csv.DictReader(io.StringIO(self._get_update_csv_data()))] + return yaml.dump(data) - def test_bulk_import_objects_without_permission(self): + def _get_update_json_data(self): + data = [*csv.DictReader(io.StringIO(self._get_update_csv_data()))] + return json.dumps(data) + + def _get_update_array(self): + return self.csv_update_data + + def test_bulk_import_objects_without_permission_csv(self): data = { 'data': self._get_csv_data(), 'format': 'csv', @@ -571,8 +593,38 @@ def test_bulk_import_objects_without_permission(self): with disable_warnings('django.request'): self.assertHttpStatus(response, 403) + def test_bulk_import_objects_without_permission_yaml(self): + data = { + 'data': self._get_yaml_data(), + 'format': 'yaml', + } + + # Test GET without permission + with disable_warnings('django.request'): + self.assertHttpStatus(self.client.get(self._get_url('import')), 403) + + # Try POST without permission + response = self.client.post(self._get_url('import'), data) + with disable_warnings('django.request'): + self.assertHttpStatus(response, 403) + + def test_bulk_import_objects_without_permission_json(self): + data = { + 'data': self._get_json_data(), + 'format': 'json', + } + + # Test GET without permission + with disable_warnings('django.request'): + self.assertHttpStatus(self.client.get(self._get_url('import')), 403) + + # Try POST without permission + response = self.client.post(self._get_url('import'), data) + with disable_warnings('django.request'): + self.assertHttpStatus(response, 403) + @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) - def test_bulk_import_objects_with_permission(self): + def test_bulk_import_objects_with_permission_csv(self): initial_count = self._get_queryset().count() data = { 'data': self._get_csv_data(), @@ -596,15 +648,64 @@ def test_bulk_import_objects_with_permission(self): self.assertEqual(self._get_queryset().count(), initial_count + len(self.csv_data) - 1) @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) - def test_bulk_update_objects_with_permission(self): + def test_bulk_import_objects_with_permission_yaml(self): + initial_count = self._get_queryset().count() + data = { + 'data': self._get_yaml_data(), + 'format': 'yaml', + } + + # Assign model-level permission + obj_perm = ObjectPermission( + name='Test permission', + actions=['add'] + ) + obj_perm.save() + obj_perm.users.add(self.user) + obj_perm.object_types.add(ContentType.objects.get_for_model(self.model)) + + # Try GET with model-level permission + self.assertHttpStatus(self.client.get(self._get_url('import')), 200) + + # Test POST with permission + self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200) + self.assertEqual(self._get_queryset().count(), initial_count + len(self.csv_data) - 1) + + @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) + def test_bulk_import_objects_with_permission_json(self): + initial_count = self._get_queryset().count() + data = { + 'data': self._get_json_data(), + 'format': 'json', + } + + # Assign model-level permission + obj_perm = ObjectPermission( + name='Test permission', + actions=['add'] + ) + obj_perm.save() + obj_perm.users.add(self.user) + obj_perm.object_types.add(ContentType.objects.get_for_model(self.model)) + + # Try GET with model-level permission + self.assertHttpStatus(self.client.get(self._get_url('import')), 200) + + # Test POST with permission + self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200) + self.assertEqual(self._get_queryset().count(), initial_count + len(self.csv_data) - 1) + + @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) + def test_bulk_update_objects_with_permission_csv(self): if not hasattr(self, 'csv_update_data'): raise NotImplementedError("The test must define csv_update_data.") initial_count = self._get_queryset().count() - array, csv_data = self._get_update_csv_data() + array = self._get_update_array() + update_data = self._get_update_csv_data() data = { 'format': ImportFormatChoices.CSV, - 'data': csv_data, + 'data': update_data, } # Assign model-level permission @@ -633,7 +734,83 @@ def test_bulk_update_objects_with_permission(self): self.assertEqual(value, value) @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) - def test_bulk_import_objects_with_constrained_permission(self): + def test_bulk_update_objects_with_permission_yaml(self): + if not hasattr(self, 'csv_update_data'): + raise NotImplementedError("The test must define csv_update_data.") + + initial_count = self._get_queryset().count() + array = self._get_update_array() + update_data = self._get_update_yaml_data() + data = { + 'format': ImportFormatChoices.YAML, + 'data': update_data, + } + + # Assign model-level permission + obj_perm = ObjectPermission( + name='Test permission', + actions=['add'] + ) + obj_perm.save() + obj_perm.users.add(self.user) + obj_perm.object_types.add(ContentType.objects.get_for_model(self.model)) + + # Test POST with permission + self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200) + self.assertEqual(initial_count, self._get_queryset().count()) + + reader = csv.DictReader(array, delimiter=',') + check_data = list(reader) + for line in check_data: + obj = self.model.objects.get(id=line["id"]) + for attr, value in line.items(): + if attr != "id": + field = self.model._meta.get_field(attr) + value = getattr(obj, attr) + # cannot verify FK fields as don't know what name the CSV maps to + if value is not None and not isinstance(field, ForeignKey): + self.assertEqual(value, value) + + @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) + def test_bulk_update_objects_with_permission_json(self): + if not hasattr(self, 'csv_update_data'): + raise NotImplementedError("The test must define csv_update_data.") + + initial_count = self._get_queryset().count() + array = self._get_update_array() + update_data = self._get_update_json_data() + data = { + 'format': ImportFormatChoices.JSON, + 'data': update_data, + } + + # Assign model-level permission + obj_perm = ObjectPermission( + name='Test permission', + actions=['add'] + ) + obj_perm.save() + obj_perm.users.add(self.user) + obj_perm.object_types.add(ContentType.objects.get_for_model(self.model)) + + # Test POST with permission + self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200) + self.assertEqual(initial_count, self._get_queryset().count()) + + reader = csv.DictReader(array, delimiter=',') + check_data = list(reader) + for line in check_data: + obj = self.model.objects.get(id=line["id"]) + for attr, value in line.items(): + if attr != "id": + field = self.model._meta.get_field(attr) + value = getattr(obj, attr) + # cannot verify FK fields as don't know what name the CSV maps to + if value is not None and not isinstance(field, ForeignKey): + self.assertEqual(value, value) + + @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) + def test_bulk_import_objects_with_constrained_permission_csv(self): initial_count = self._get_queryset().count() data = { 'data': self._get_csv_data(), @@ -662,6 +839,66 @@ def test_bulk_import_objects_with_constrained_permission(self): self.assertHttpStatus(self.client.post(self._get_url('import'), data), 302) self.assertEqual(self._get_queryset().count(), initial_count + len(self.csv_data) - 1) + @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) + def test_bulk_import_objects_with_constrained_permission_yaml(self): + initial_count = self._get_queryset().count() + data = { + 'data': self._get_yaml_data(), + 'format': 'yaml', + } + + # Assign constrained permission + obj_perm = ObjectPermission( + name='Test permission', + constraints={'pk': 0}, # Dummy permission to deny all + actions=['add'] + ) + obj_perm.save() + obj_perm.users.add(self.user) + obj_perm.object_types.add(ContentType.objects.get_for_model(self.model)) + + # Attempt to import non-permitted objects + self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200) + self.assertEqual(self._get_queryset().count(), initial_count) + + # Update permission constraints + obj_perm.constraints = {'pk__gt': 0} # Dummy permission to allow all + obj_perm.save() + + # Import permitted objects + self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200) + self.assertEqual(self._get_queryset().count(), initial_count + len(self.csv_data) - 1) + + @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) + def test_bulk_import_objects_with_constrained_permission_json(self): + initial_count = self._get_queryset().count() + data = { + 'data': self._get_json_data(), + 'format': 'json', + } + + # Assign constrained permission + obj_perm = ObjectPermission( + name='Test permission', + constraints={'pk': 0}, # Dummy permission to deny all + actions=['add'] + ) + obj_perm.save() + obj_perm.users.add(self.user) + obj_perm.object_types.add(ContentType.objects.get_for_model(self.model)) + + # Attempt to import non-permitted objects + self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200) + self.assertEqual(self._get_queryset().count(), initial_count) + + # Update permission constraints + obj_perm.constraints = {'pk__gt': 0} # Dummy permission to allow all + obj_perm.save() + + # Import permitted objects + self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200) + self.assertEqual(self._get_queryset().count(), initial_count + len(self.csv_data) - 1) + class BulkEditObjectsViewTestCase(ModelViewTestCase): """ Edit multiple instances. From 1f410cf35a3f6a684facbf3722e4307f9a545cf4 Mon Sep 17 00:00:00 2001 From: Arthur Date: Tue, 25 Apr 2023 13:38:05 -0700 Subject: [PATCH 10/16] 11617 add tests for invalid headers to csv,yaml,json --- netbox/netbox/tests/test_import.py | 224 +++++++++++++++++++++- netbox/netbox/views/generic/bulk_views.py | 3 +- 2 files changed, 223 insertions(+), 4 deletions(-) diff --git a/netbox/netbox/tests/test_import.py b/netbox/netbox/tests/test_import.py index 73f775bd76f..d5d8861f99e 100644 --- a/netbox/netbox/tests/test_import.py +++ b/netbox/netbox/tests/test_import.py @@ -1,3 +1,8 @@ +import csv +import io +import json +import yaml + from django.contrib.contenttypes.models import ContentType from django.test import override_settings @@ -17,8 +22,16 @@ def setUpTestData(cls): def _get_csv_data(self, csv_data): return '\n'.join(csv_data) + def _get_yaml_data(self, csv_data): + data = [*csv.DictReader(io.StringIO(self._get_csv_data(csv_data)))] + return yaml.dump(data) + + def _get_json_data(self, csv_data): + data = [*csv.DictReader(io.StringIO(self._get_csv_data(csv_data)))] + return json.dumps(data) + @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) - def test_valid_tags(self): + def test_valid_tags_csv(self): csv_data = ( 'name,slug,tags', 'Region 1,region-1,"alpha,bravo"', @@ -61,7 +74,93 @@ def test_valid_tags(self): self.assertEqual(regions[3].tags.count(), 0) @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) - def test_invalid_tags(self): + def test_valid_tags_yaml(self): + csv_data = ( + 'name,slug,tags', + 'Region 1,region-1,"alpha,bravo"', + 'Region 2,region-2,"charlie,delta"', + 'Region 3,region-3,echo', + 'Region 4,region-4,', + ) + + data = { + 'format': ImportFormatChoices.YAML, + 'data': self._get_yaml_data(csv_data), + } + + # Assign model-level permission + obj_perm = ObjectPermission(name='Test permission', actions=['add']) + obj_perm.save() + obj_perm.users.add(self.user) + obj_perm.object_types.add(ContentType.objects.get_for_model(self.model)) + + # Try GET with model-level permission + self.assertHttpStatus(self.client.get(self._get_url('import')), 200) + + # Test POST with permission + self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200) + regions = Region.objects.all() + self.assertEqual(regions.count(), 4) + region = Region.objects.get(slug="region-4") + self.assertEqual( + list(regions[0].tags.values_list('name', flat=True)), + ['Alpha', 'Bravo'] + ) + self.assertEqual( + list(regions[1].tags.values_list('name', flat=True)), + ['Charlie', 'Delta'] + ) + self.assertEqual( + list(regions[2].tags.values_list('name', flat=True)), + ['Echo'] + ) + self.assertEqual(regions[3].tags.count(), 0) + + @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) + def test_valid_tags_json(self): + csv_data = ( + 'name,slug,tags', + 'Region 1,region-1,"alpha,bravo"', + 'Region 2,region-2,"charlie,delta"', + 'Region 3,region-3,echo', + 'Region 4,region-4,', + ) + + data = { + 'format': ImportFormatChoices.JSON, + 'data': self._get_json_data(csv_data), + } + + # Assign model-level permission + obj_perm = ObjectPermission(name='Test permission', actions=['add']) + obj_perm.save() + obj_perm.users.add(self.user) + obj_perm.object_types.add(ContentType.objects.get_for_model(self.model)) + + # Try GET with model-level permission + self.assertHttpStatus(self.client.get(self._get_url('import')), 200) + + # Test POST with permission + self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200) + regions = Region.objects.all() + self.assertEqual(regions.count(), 4) + region = Region.objects.get(slug="region-4") + self.assertEqual( + list(regions[0].tags.values_list('name', flat=True)), + ['Alpha', 'Bravo'] + ) + self.assertEqual( + list(regions[1].tags.values_list('name', flat=True)), + ['Charlie', 'Delta'] + ) + self.assertEqual( + list(regions[2].tags.values_list('name', flat=True)), + ['Echo'] + ) + self.assertEqual(regions[3].tags.count(), 0) + + @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) + def test_invalid_tags_csv(self): csv_data = ( 'name,slug,tags', 'Region 1,region-1,"Alpha,Bravo"', # Valid @@ -85,3 +184,124 @@ def test_invalid_tags(self): # Test POST with permission self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200) self.assertEqual(Region.objects.count(), 0) + + @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) + def test_invalid_tags_yaml(self): + csv_data = ( + 'name,slug,tags', + 'Region 1,region-1,"Alpha,Bravo"', # Valid + 'Region 2,region-2,"Alpha,Tango"', # Invalid + ) + + data = { + 'format': ImportFormatChoices.YAML, + 'data': self._get_yaml_data(csv_data), + } + + # Assign model-level permission + obj_perm = ObjectPermission(name='Test permission', actions=['add']) + obj_perm.save() + obj_perm.users.add(self.user) + obj_perm.object_types.add(ContentType.objects.get_for_model(self.model)) + + # Try GET with model-level permission + self.assertHttpStatus(self.client.get(self._get_url('import')), 200) + + # Test POST with permission + self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200) + self.assertEqual(Region.objects.count(), 0) + + @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) + def test_invalid_tags_json(self): + csv_data = ( + 'name,slug,tags', + 'Region 1,region-1,"Alpha,Bravo"', # Valid + 'Region 2,region-2,"Alpha,Tango"', # Invalid + ) + + data = { + 'format': ImportFormatChoices.JSON, + 'data': self._get_json_data(csv_data), + } + + # Assign model-level permission + obj_perm = ObjectPermission(name='Test permission', actions=['add']) + obj_perm.save() + obj_perm.users.add(self.user) + obj_perm.object_types.add(ContentType.objects.get_for_model(self.model)) + + # Try GET with model-level permission + self.assertHttpStatus(self.client.get(self._get_url('import')), 200) + + # Test POST with permission + self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200) + self.assertEqual(Region.objects.count(), 0) + + @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) + def test_invalid_header_csv(self): + csv_data = ( + 'name,slug,tags,xxx', + 'Region 1,region-1,"alpha,bravo",yyy', + ) + + data = { + 'format': ImportFormatChoices.CSV, + 'data': self._get_csv_data(csv_data), + } + + # Assign model-level permission + obj_perm = ObjectPermission(name='Test permission', actions=['add']) + obj_perm.save() + obj_perm.users.add(self.user) + obj_perm.object_types.add(ContentType.objects.get_for_model(self.model)) + + # Test POST with permission + ret = self.client.post(self._get_url('import'), data) + self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200) + self.assertEqual(Region.objects.count(), 0) + + @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) + def test_invalid_header_yaml(self): + csv_data = ( + 'name,slug,tags,xxx', + 'Region 1,region-1,"alpha,bravo",yyy', + ) + + data = { + 'format': ImportFormatChoices.YAML, + 'data': self._get_yaml_data(csv_data), + } + + # Assign model-level permission + obj_perm = ObjectPermission(name='Test permission', actions=['add']) + obj_perm.save() + obj_perm.users.add(self.user) + obj_perm.object_types.add(ContentType.objects.get_for_model(self.model)) + + # Test POST with permission + ret = self.client.post(self._get_url('import'), data) + self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200) + self.assertEqual(Region.objects.count(), 0) + + @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) + def test_invalid_header_json(self): + csv_data = ( + 'name,slug,tags,xxx', + 'Region 1,region-1,"alpha,bravo",yyy', + ) + + data = { + 'format': ImportFormatChoices.JSON, + 'data': self._get_json_data(csv_data), + } + + # Assign model-level permission + obj_perm = ObjectPermission(name='Test permission', actions=['add']) + obj_perm.save() + obj_perm.users.add(self.user) + obj_perm.object_types.add(ContentType.objects.get_for_model(self.model)) + + # Test POST with permission + ret = self.client.post(self._get_url('import'), data) + self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200) + self.assertEqual(Region.objects.count(), 0) diff --git a/netbox/netbox/views/generic/bulk_views.py b/netbox/netbox/views/generic/bulk_views.py index 0e7bfdcd785..1334d2c19ed 100644 --- a/netbox/netbox/views/generic/bulk_views.py +++ b/netbox/netbox/views/generic/bulk_views.py @@ -23,8 +23,7 @@ from utilities.forms import BulkRenameForm, ConfirmationForm, restrict_form_fields from utilities.forms.bulk_import import BulkImportForm from utilities.forms.utils import headers_to_dict, validate_import_headers -from utilities.htmx import is_embedded -from utilities.htmx import is_htmx +from utilities.htmx import is_embedded, is_htmx from utilities.permissions import get_permission_for_model from utilities.utils import get_viewname from utilities.views import GetReturnURLMixin From 760b85b841ab4a11431c83f5be20b2600dcdd02a Mon Sep 17 00:00:00 2001 From: Arthur Date: Wed, 16 Aug 2023 12:59:28 -0700 Subject: [PATCH 11/16] 11617 fix test cases --- netbox/netbox/tests/test_import.py | 12 ++++++------ netbox/netbox/views/generic/bulk_views.py | 7 ++++++- netbox/utilities/testing/views.py | 12 ++++++------ 3 files changed, 18 insertions(+), 13 deletions(-) diff --git a/netbox/netbox/tests/test_import.py b/netbox/netbox/tests/test_import.py index d5d8861f99e..5b767c5a0d6 100644 --- a/netbox/netbox/tests/test_import.py +++ b/netbox/netbox/tests/test_import.py @@ -240,8 +240,8 @@ def test_invalid_tags_json(self): @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) def test_invalid_header_csv(self): csv_data = ( - 'name,slug,tags,xxx', - 'Region 1,region-1,"alpha,bravo",yyy', + 'name,slug,xxx', + 'Region 1,region-1,yyy', ) data = { @@ -263,8 +263,8 @@ def test_invalid_header_csv(self): @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) def test_invalid_header_yaml(self): csv_data = ( - 'name,slug,tags,xxx', - 'Region 1,region-1,"alpha,bravo",yyy', + 'name,slug,xxx', + 'Region 1,region-1,yyy', ) data = { @@ -286,8 +286,8 @@ def test_invalid_header_yaml(self): @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) def test_invalid_header_json(self): csv_data = ( - 'name,slug,tags,xxx', - 'Region 1,region-1,"alpha,bravo",yyy', + 'name,slug,xxx', + 'Region 1,region-1,yyy', ) data = { diff --git a/netbox/netbox/views/generic/bulk_views.py b/netbox/netbox/views/generic/bulk_views.py index 1334d2c19ed..3884fb4017a 100644 --- a/netbox/netbox/views/generic/bulk_views.py +++ b/netbox/netbox/views/generic/bulk_views.py @@ -404,8 +404,13 @@ def create_and_update_objects(self, form, request): required_fields = [ name for name, field in form_fields.items() if field.required ] + if not headers: - headers = headers_to_dict(list(record.keys())) + keys = list(record.keys()) + if object_id: + keys.append("id") + headers = headers_to_dict(keys) + validate_import_headers(headers, form_fields, required_fields, allow_extra_columns=isinstance(model_form, (DeviceTypeImportForm, ModuleTypeImportForm))) # When updating, omit all form fields other than those specified in the record. (No diff --git a/netbox/utilities/testing/views.py b/netbox/utilities/testing/views.py index e2a703baa03..3e828c6a662 100644 --- a/netbox/utilities/testing/views.py +++ b/netbox/utilities/testing/views.py @@ -668,7 +668,7 @@ def test_bulk_import_objects_with_permission_yaml(self): self.assertHttpStatus(self.client.get(self._get_url('import')), 200) # Test POST with permission - self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200) + self.assertHttpStatus(self.client.post(self._get_url('import'), data), 302) self.assertEqual(self._get_queryset().count(), initial_count + len(self.csv_data) - 1) @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) @@ -692,7 +692,7 @@ def test_bulk_import_objects_with_permission_json(self): self.assertHttpStatus(self.client.get(self._get_url('import')), 200) # Test POST with permission - self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200) + self.assertHttpStatus(self.client.post(self._get_url('import'), data), 302) self.assertEqual(self._get_queryset().count(), initial_count + len(self.csv_data) - 1) @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) @@ -756,7 +756,7 @@ def test_bulk_update_objects_with_permission_yaml(self): obj_perm.object_types.add(ContentType.objects.get_for_model(self.model)) # Test POST with permission - self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200) + self.assertHttpStatus(self.client.post(self._get_url('import'), data), 302) self.assertEqual(initial_count, self._get_queryset().count()) reader = csv.DictReader(array, delimiter=',') @@ -794,7 +794,7 @@ def test_bulk_update_objects_with_permission_json(self): obj_perm.object_types.add(ContentType.objects.get_for_model(self.model)) # Test POST with permission - self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200) + self.assertHttpStatus(self.client.post(self._get_url('import'), data), 302) self.assertEqual(initial_count, self._get_queryset().count()) reader = csv.DictReader(array, delimiter=',') @@ -866,7 +866,7 @@ def test_bulk_import_objects_with_constrained_permission_yaml(self): obj_perm.save() # Import permitted objects - self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200) + self.assertHttpStatus(self.client.post(self._get_url('import'), data), 302) self.assertEqual(self._get_queryset().count(), initial_count + len(self.csv_data) - 1) @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) @@ -896,7 +896,7 @@ def test_bulk_import_objects_with_constrained_permission_json(self): obj_perm.save() # Import permitted objects - self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200) + self.assertHttpStatus(self.client.post(self._get_url('import'), data), 302) self.assertEqual(self._get_queryset().count(), initial_count + len(self.csv_data) - 1) class BulkEditObjectsViewTestCase(ModelViewTestCase): From 1b9e0a4c8bf6d8431fd018c05899598764e00d48 Mon Sep 17 00:00:00 2001 From: Arthur Date: Wed, 16 Aug 2023 14:55:15 -0700 Subject: [PATCH 12/16] 11617 fix test cases --- netbox/netbox/tests/test_import.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/netbox/netbox/tests/test_import.py b/netbox/netbox/tests/test_import.py index 5b767c5a0d6..6290a16b83e 100644 --- a/netbox/netbox/tests/test_import.py +++ b/netbox/netbox/tests/test_import.py @@ -141,7 +141,7 @@ def test_valid_tags_json(self): self.assertHttpStatus(self.client.get(self._get_url('import')), 200) # Test POST with permission - self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200) + self.assertHttpStatus(self.client.post(self._get_url('import'), data), 302) regions = Region.objects.all() self.assertEqual(regions.count(), 4) region = Region.objects.get(slug="region-4") @@ -208,7 +208,7 @@ def test_invalid_tags_yaml(self): self.assertHttpStatus(self.client.get(self._get_url('import')), 200) # Test POST with permission - self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200) + self.assertHttpStatus(self.client.post(self._get_url('import'), data), 302) self.assertEqual(Region.objects.count(), 0) @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) From f303b2f257eca25e377c7e7e86f44bf07989276b Mon Sep 17 00:00:00 2001 From: Arthur Date: Wed, 16 Aug 2023 15:05:38 -0700 Subject: [PATCH 13/16] 11617 fix test cases --- netbox/netbox/tests/test_import.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/netbox/netbox/tests/test_import.py b/netbox/netbox/tests/test_import.py index 6290a16b83e..af4d025d4d7 100644 --- a/netbox/netbox/tests/test_import.py +++ b/netbox/netbox/tests/test_import.py @@ -98,7 +98,7 @@ def test_valid_tags_yaml(self): self.assertHttpStatus(self.client.get(self._get_url('import')), 200) # Test POST with permission - self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200) + self.assertHttpStatus(self.client.post(self._get_url('import'), data), 302) regions = Region.objects.all() self.assertEqual(regions.count(), 4) region = Region.objects.get(slug="region-4") From 3b6ef4455dae4a5de990783e70a8feda1f840d7c Mon Sep 17 00:00:00 2001 From: Arthur Date: Wed, 16 Aug 2023 15:24:28 -0700 Subject: [PATCH 14/16] 11617 fix test cases - disable bulk import for L2VPNTerminationTestCase --- netbox/ipam/tests/test_views.py | 2 +- netbox/netbox/tests/test_import.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/netbox/ipam/tests/test_views.py b/netbox/ipam/tests/test_views.py index c9128c0f696..7fe8f104dd9 100644 --- a/netbox/ipam/tests/test_views.py +++ b/netbox/ipam/tests/test_views.py @@ -1032,7 +1032,7 @@ class L2VPNTerminationTestCase( ViewTestCases.EditObjectViewTestCase, ViewTestCases.DeleteObjectViewTestCase, ViewTestCases.ListObjectsViewTestCase, - ViewTestCases.BulkImportObjectsViewTestCase, + # ViewTestCases.BulkImportObjectsViewTestCase, ViewTestCases.BulkDeleteObjectsViewTestCase, ): diff --git a/netbox/netbox/tests/test_import.py b/netbox/netbox/tests/test_import.py index af4d025d4d7..9b4a1ad8997 100644 --- a/netbox/netbox/tests/test_import.py +++ b/netbox/netbox/tests/test_import.py @@ -208,7 +208,7 @@ def test_invalid_tags_yaml(self): self.assertHttpStatus(self.client.get(self._get_url('import')), 200) # Test POST with permission - self.assertHttpStatus(self.client.post(self._get_url('import'), data), 302) + self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200) self.assertEqual(Region.objects.count(), 0) @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) From 7d048b38e8f649486361e817fccbe938c9fe15a9 Mon Sep 17 00:00:00 2001 From: Arthur Date: Wed, 16 Aug 2023 15:50:20 -0700 Subject: [PATCH 15/16] 11617 fix test cases - fix bulk import for L2VPNTerminationTestCase --- netbox/ipam/forms/bulk_import.py | 6 ++++-- netbox/ipam/tests/test_views.py | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/netbox/ipam/forms/bulk_import.py b/netbox/ipam/forms/bulk_import.py index 3bce26249a5..785390e2d4f 100644 --- a/netbox/ipam/forms/bulk_import.py +++ b/netbox/ipam/forms/bulk_import.py @@ -548,9 +548,11 @@ def clean(self): if self.cleaned_data.get('device') and self.cleaned_data.get('virtual_machine'): raise ValidationError('Cannot import device and VM interface terminations simultaneously.') - if not (self.cleaned_data.get('interface') or self.cleaned_data.get('vlan')): + if not self.instance and not (self.cleaned_data.get('interface') or self.cleaned_data.get('vlan')): raise ValidationError('Each termination must specify either an interface or a VLAN.') if self.cleaned_data.get('interface') and self.cleaned_data.get('vlan'): raise ValidationError('Cannot assign both an interface and a VLAN.') - self.instance.assigned_object = self.cleaned_data.get('interface') or self.cleaned_data.get('vlan') + # if this is an update we might not have interface or vlan in the form data + if self.cleaned_data.get('interface') or self.cleaned_data.get('vlan'): + self.instance.assigned_object = self.cleaned_data.get('interface') or self.cleaned_data.get('vlan') diff --git a/netbox/ipam/tests/test_views.py b/netbox/ipam/tests/test_views.py index 7fe8f104dd9..c9128c0f696 100644 --- a/netbox/ipam/tests/test_views.py +++ b/netbox/ipam/tests/test_views.py @@ -1032,7 +1032,7 @@ class L2VPNTerminationTestCase( ViewTestCases.EditObjectViewTestCase, ViewTestCases.DeleteObjectViewTestCase, ViewTestCases.ListObjectsViewTestCase, - # ViewTestCases.BulkImportObjectsViewTestCase, + ViewTestCases.BulkImportObjectsViewTestCase, ViewTestCases.BulkDeleteObjectsViewTestCase, ): From fae151d1643ad9b1ebdc842e0783fc9a931ac2b5 Mon Sep 17 00:00:00 2001 From: Arthur Date: Thu, 17 Aug 2023 08:06:07 -0700 Subject: [PATCH 16/16] 11617 remove l2vpntermination fix --- netbox/ipam/forms/bulk_import.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/netbox/ipam/forms/bulk_import.py b/netbox/ipam/forms/bulk_import.py index 785390e2d4f..3bce26249a5 100644 --- a/netbox/ipam/forms/bulk_import.py +++ b/netbox/ipam/forms/bulk_import.py @@ -548,11 +548,9 @@ def clean(self): if self.cleaned_data.get('device') and self.cleaned_data.get('virtual_machine'): raise ValidationError('Cannot import device and VM interface terminations simultaneously.') - if not self.instance and not (self.cleaned_data.get('interface') or self.cleaned_data.get('vlan')): + if not (self.cleaned_data.get('interface') or self.cleaned_data.get('vlan')): raise ValidationError('Each termination must specify either an interface or a VLAN.') if self.cleaned_data.get('interface') and self.cleaned_data.get('vlan'): raise ValidationError('Cannot assign both an interface and a VLAN.') - # if this is an update we might not have interface or vlan in the form data - if self.cleaned_data.get('interface') or self.cleaned_data.get('vlan'): - self.instance.assigned_object = self.cleaned_data.get('interface') or self.cleaned_data.get('vlan') + self.instance.assigned_object = self.cleaned_data.get('interface') or self.cleaned_data.get('vlan')