Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions netbox/dcim/forms/bulk_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from utilities.forms import CSVChoiceField, CSVContentTypeField, CSVModelChoiceField, CSVTypedChoiceField, SlugField
from virtualization.models import Cluster
from wireless.choices import WirelessRoleChoices
from .common import ModuleCommonForm

__all__ = (
'CableCSVForm',
Expand Down Expand Up @@ -407,7 +408,7 @@ def __init__(self, data=None, *args, **kwargs):
self.fields['rack'].queryset = self.fields['rack'].queryset.filter(**params)


class ModuleCSVForm(NetBoxModelCSVForm):
class ModuleCSVForm(ModuleCommonForm, NetBoxModelCSVForm):
device = CSVModelChoiceField(
queryset=Device.objects.all(),
to_field_name='name'
Expand All @@ -420,11 +421,20 @@ class ModuleCSVForm(NetBoxModelCSVForm):
queryset=ModuleType.objects.all(),
to_field_name='model'
)
replicate_components = forms.BooleanField(
required=False,
help_text="Automatically populate components associated with this module type (default: true)"
)
adopt_components = forms.BooleanField(
required=False,
help_text="Adopt already existing components"
)

class Meta:
model = Module
fields = (
'device', 'module_bay', 'module_type', 'serial', 'asset_tag', 'comments',
'device', 'module_bay', 'module_type', 'serial', 'asset_tag', 'replicate_components',
'adopt_components', 'comments',
)

def __init__(self, data=None, *args, **kwargs):
Expand All @@ -435,6 +445,13 @@ def __init__(self, data=None, *args, **kwargs):
params = {f"device__{self.fields['device'].to_field_name}": data.get('device')}
self.fields['module_bay'].queryset = self.fields['module_bay'].queryset.filter(**params)

def clean_replicate_components(self):
# Make sure replicate_components is True when it's not included in the uploaded data
if 'replicate_components' not in self.data:
return True
else:
return self.cleaned_data['replicate_components']


class ChildDeviceCSVForm(BaseDeviceCSVForm):
parent = CSVModelChoiceField(
Expand Down
58 changes: 58 additions & 0 deletions netbox/dcim/forms/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

__all__ = (
'InterfaceCommonForm',
'ModuleCommonForm'
)


Expand Down Expand Up @@ -47,3 +48,60 @@ def clean(self):
'tagged_vlans': f"The tagged VLANs ({', '.join(invalid_vlans)}) must belong to the same site as "
f"the interface's parent device/VM, or they must be global"
})


class ModuleCommonForm(forms.Form):
def clean(self):
super().clean()

replicate_components = self.cleaned_data.get("replicate_components")
adopt_components = self.cleaned_data.get("adopt_components")
device = self.cleaned_data['device']
module_type = self.cleaned_data['module_type']
module_bay = self.cleaned_data['module_bay']

if adopt_components:
self.instance._adopt_components = True

# Bail out if we are not installing a new module or if we are not replicating components
if self.instance.pk or not replicate_components:
self.instance._disable_replication = True
return

for templates, component_attribute in [
("consoleporttemplates", "consoleports"),
("consoleserverporttemplates", "consoleserverports"),
("interfacetemplates", "interfaces"),
("powerporttemplates", "powerports"),
("poweroutlettemplates", "poweroutlets"),
("rearporttemplates", "rearports"),
("frontporttemplates", "frontports")
]:
# Prefetch installed components
installed_components = {
component.name: component for component in getattr(device, component_attribute).all()
}

# Get the templates for the module type.
for template in getattr(module_type, templates).all():
# Installing modules with placeholders require that the bay has a position value
if MODULE_TOKEN in template.name and not module_bay.position:
raise forms.ValidationError(
"Cannot install module with placeholder values in a module bay with no position defined"
)

resolved_name = template.name.replace(MODULE_TOKEN, module_bay.position)
existing_item = installed_components.get(resolved_name)

# It is not possible to adopt components already belonging to a module
if adopt_components and existing_item and existing_item.module:
raise forms.ValidationError(
f"Cannot adopt {template.component_model.__name__} '{resolved_name}' as it already belongs "
f"to a module"
)

# If we are not adopting components we error if the component exists
if not adopt_components and resolved_name in installed_components:
raise forms.ValidationError(
f"{template.component_model.__name__} - {resolved_name} already exists"
)
66 changes: 2 additions & 64 deletions netbox/dcim/forms/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
)
from virtualization.models import Cluster, ClusterGroup
from wireless.models import WirelessLAN, WirelessLANGroup
from .common import InterfaceCommonForm
from .common import InterfaceCommonForm, ModuleCommonForm

__all__ = (
'CableForm',
Expand Down Expand Up @@ -657,7 +657,7 @@ def __init__(self, *args, **kwargs):
self.fields['position'].widget.choices = [(position, f'U{position}')]


class ModuleForm(NetBoxModelForm):
class ModuleForm(ModuleCommonForm, NetBoxModelForm):
device = DynamicModelChoiceField(
queryset=Device.objects.all(),
initial_params={
Expand Down Expand Up @@ -722,68 +722,6 @@ def __init__(self, *args, **kwargs):
self.fields['adopt_components'].initial = False
self.fields['adopt_components'].disabled = True

def save(self, *args, **kwargs):

# If replicate_components is False, disable automatic component replication on the instance
if self.instance.pk or not self.cleaned_data['replicate_components']:
self.instance._disable_replication = True

if self.cleaned_data['adopt_components']:
self.instance._adopt_components = True

return super().save(*args, **kwargs)

def clean(self):
super().clean()

replicate_components = self.cleaned_data.get("replicate_components")
adopt_components = self.cleaned_data.get("adopt_components")
device = self.cleaned_data['device']
module_type = self.cleaned_data['module_type']
module_bay = self.cleaned_data['module_bay']

# Bail out if we are not installing a new module or if we are not replicating components
if self.instance.pk or not replicate_components:
return

for templates, component_attribute in [
("consoleporttemplates", "consoleports"),
("consoleserverporttemplates", "consoleserverports"),
("interfacetemplates", "interfaces"),
("powerporttemplates", "powerports"),
("poweroutlettemplates", "poweroutlets"),
("rearporttemplates", "rearports"),
("frontporttemplates", "frontports")
]:
# Prefetch installed components
installed_components = {
component.name: component for component in getattr(device, component_attribute).all()
}

# Get the templates for the module type.
for template in getattr(module_type, templates).all():
# Installing modules with placeholders require that the bay has a position value
if MODULE_TOKEN in template.name and not module_bay.position:
raise forms.ValidationError(
"Cannot install module with placeholder values in a module bay with no position defined"
)

resolved_name = template.name.replace(MODULE_TOKEN, module_bay.position)
existing_item = installed_components.get(resolved_name)

# It is not possible to adopt components already belonging to a module
if adopt_components and existing_item and existing_item.module:
raise forms.ValidationError(
f"Cannot adopt {template.component_model.__name__} '{resolved_name}' as it already belongs "
f"to a module"
)

# If we are not adopting components we error if the component exists
if not adopt_components and resolved_name in installed_components:
raise forms.ValidationError(
f"{template.component_model.__name__} - {resolved_name} already exists"
)


class CableForm(TenancyForm, NetBoxModelForm):

Expand Down
90 changes: 90 additions & 0 deletions netbox/dcim/tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1848,6 +1848,53 @@ def test_module_component_replication(self):
self.assertHttpStatus(self.client.post(**request), 302)
self.assertEqual(Interface.objects.filter(device=device).count(), 5)

@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
def test_module_bulk_replication(self):
self.add_permissions('dcim.add_module')

# Add 5 InterfaceTemplates to a ModuleType
module_type = ModuleType.objects.first()
interface_templates = [
InterfaceTemplate(module_type=module_type, name=f'Interface {i}') for i in range(1, 6)
]
InterfaceTemplate.objects.bulk_create(interface_templates)

form_data = self.form_data.copy()
device = Device.objects.get(pk=form_data['device'])

# Create a module *without* replicating components
module_bay = ModuleBay.objects.get(pk=form_data['module_bay'])
csv_data = [
"device,module_bay,module_type,replicate_components",
f"{device.name},{module_bay.name},{module_type.model},false"
]
request = {
'path': self._get_url('import'),
'data': {
'csv': '\n'.join(csv_data),
}
}

initial_count = self._get_queryset().count()
self.assertHttpStatus(self.client.post(**request), 200)
self.assertEqual(self._get_queryset().count(), initial_count + len(csv_data) - 1)
self.assertEqual(Interface.objects.filter(device=device).count(), 0)

# Create a second module (in the next bay) with replicated components
module_bay = ModuleBay.objects.get(pk=(form_data['module_bay'] + 1))
csv_data[1] = f"{device.name},{module_bay.name},{module_type.model},true"
request = {
'path': self._get_url('import'),
'data': {
'csv': '\n'.join(csv_data),
}
}

initial_count = self._get_queryset().count()
self.assertHttpStatus(self.client.post(**request), 200)
self.assertEqual(self._get_queryset().count(), initial_count + len(csv_data) - 1)
self.assertEqual(Interface.objects.filter(device=device).count(), 5)

@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
def test_module_component_adoption(self):
self.add_permissions('dcim.add_module')
Expand Down Expand Up @@ -1885,6 +1932,49 @@ def test_module_component_adoption(self):
# Check that the Interface now has a module
self.assertIsNotNone(interface.module)

@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
def test_module_bulk_adoption(self):
self.add_permissions('dcim.add_module')

interface_name = "Interface-1"

# Add an interface to the ModuleType
module_type = ModuleType.objects.first()
InterfaceTemplate(module_type=module_type, name=interface_name).save()

form_data = self.form_data.copy()
device = Device.objects.get(pk=form_data['device'])

# Create an interface to be adopted
interface = Interface(device=device, name=interface_name, type=InterfaceTypeChoices.TYPE_10GE_FIXED)
interface.save()

# Ensure that interface is created with no module
self.assertIsNone(interface.module)

# Create a module with adopted components
Copy link
Contributor

@kkthxbye-code kkthxbye-code Dec 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One small thing, this testcase actually fails on my PC. It's trying to install in the first modulebay of the device, which is already occupied from the setup. If I change to another bay it passes. Not sure if it's something with my setup or from the rebase.

Testcase that fails:

self.assertEqual(self._get_queryset().count(), initial_count + len(csv_data) - 1)

AssertionError: 3 != 4

https://github.com/netbox-community/netbox/pull/9498/files#diff-fb7b770d4e8b9f98f2376f15c9e9fac16cb773b6d80e760be04c9ad2fd45ccf2R1978

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same. Maybe related to 9ef24d3? In any case, changing it to an empty bay appears to fix the test.

module_bay = ModuleBay.objects.get(device=device, name='Module Bay 4')
csv_data = [
"device,module_bay,module_type,replicate_components,adopt_components",
f"{device.name},{module_bay.name},{module_type.model},false,true"
]
request = {
'path': self._get_url('import'),
'data': {
'csv': '\n'.join(csv_data),
}
}

initial_count = self._get_queryset().count()
self.assertHttpStatus(self.client.post(**request), 200)
self.assertEqual(self._get_queryset().count(), initial_count + len(csv_data) - 1)

# Re-retrieve interface to get new module id
interface.refresh_from_db()

# Check that the Interface now has a module
self.assertIsNotNone(interface.module)


class ConsolePortTestCase(ViewTestCases.DeviceComponentViewTestCase):
model = ConsolePort
Expand Down