Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions netbox/core/models/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from django.utils.module_loading import import_string
from django.utils.translation import gettext as _

from netbox.constants import CENSOR_TOKEN, CENSOR_TOKEN_CHANGED
from netbox.models import PrimaryModel
from netbox.models.features import JobsMixin
from netbox.registry import registry
Expand Down Expand Up @@ -130,6 +131,28 @@ def clean(self):
'source_url': f"URLs for local sources must start with file:// (or specify no scheme)"
})

def to_objectchange(self, action):
objectchange = super().to_objectchange(action)

# Censor any backend parameters marked as sensitive in the serialized data
pre_change_params = {}
post_change_params = {}
if objectchange.prechange_data:
pre_change_params = objectchange.prechange_data.get('parameters') or {} # parameters may be None
if objectchange.postchange_data:
post_change_params = objectchange.postchange_data.get('parameters') or {}
for param in self.backend_class.sensitive_parameters:
if post_change_params.get(param):
if post_change_params[param] != pre_change_params.get(param):
# Set the "changed" token if the parameter's value has been modified
post_change_params[param] = CENSOR_TOKEN_CHANGED
else:
post_change_params[param] = CENSOR_TOKEN
if pre_change_params.get(param):
pre_change_params[param] = CENSOR_TOKEN

return objectchange

def enqueue_sync_job(self, request):
"""
Enqueue a background job to synchronize the DataSource by calling sync().
Expand Down
122 changes: 122 additions & 0 deletions netbox/core/tests/test_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
from django.test import TestCase

from core.models import DataSource
from extras.choices import ObjectChangeActionChoices
from netbox.constants import CENSOR_TOKEN, CENSOR_TOKEN_CHANGED


class DataSourceChangeLoggingTestCase(TestCase):

def test_password_added_on_create(self):
datasource = DataSource.objects.create(
name='Data Source 1',
type='git',
source_url='http://localhost/',
parameters={
'username': 'jeff',
'password': 'foobar123',
}
)

objectchange = datasource.to_objectchange(ObjectChangeActionChoices.ACTION_CREATE)
self.assertIsNone(objectchange.prechange_data)
self.assertEqual(objectchange.postchange_data['parameters']['username'], 'jeff')
self.assertEqual(objectchange.postchange_data['parameters']['password'], CENSOR_TOKEN_CHANGED)

def test_password_added_on_update(self):
datasource = DataSource.objects.create(
name='Data Source 1',
type='git',
source_url='http://localhost/'
)
datasource.snapshot()

# Add a blank password
datasource.parameters = {
'username': 'jeff',
'password': '',
}

objectchange = datasource.to_objectchange(ObjectChangeActionChoices.ACTION_UPDATE)
self.assertIsNone(objectchange.prechange_data['parameters'])
self.assertEqual(objectchange.postchange_data['parameters']['username'], 'jeff')
self.assertEqual(objectchange.postchange_data['parameters']['password'], '')

# Add a password
datasource.parameters = {
'username': 'jeff',
'password': 'foobar123',
}

objectchange = datasource.to_objectchange(ObjectChangeActionChoices.ACTION_UPDATE)
self.assertEqual(objectchange.postchange_data['parameters']['username'], 'jeff')
self.assertEqual(objectchange.postchange_data['parameters']['password'], CENSOR_TOKEN_CHANGED)

def test_password_changed(self):
datasource = DataSource.objects.create(
name='Data Source 1',
type='git',
source_url='http://localhost/',
parameters={
'username': 'jeff',
'password': 'password1',
}
)
datasource.snapshot()

# Change the password
datasource.parameters['password'] = 'password2'

objectchange = datasource.to_objectchange(ObjectChangeActionChoices.ACTION_UPDATE)
self.assertEqual(objectchange.prechange_data['parameters']['username'], 'jeff')
self.assertEqual(objectchange.prechange_data['parameters']['password'], CENSOR_TOKEN)
self.assertEqual(objectchange.postchange_data['parameters']['username'], 'jeff')
self.assertEqual(objectchange.postchange_data['parameters']['password'], CENSOR_TOKEN_CHANGED)

def test_password_removed_on_update(self):
datasource = DataSource.objects.create(
name='Data Source 1',
type='git',
source_url='http://localhost/',
parameters={
'username': 'jeff',
'password': 'foobar123',
}
)
datasource.snapshot()

objectchange = datasource.to_objectchange(ObjectChangeActionChoices.ACTION_UPDATE)
self.assertEqual(objectchange.prechange_data['parameters']['username'], 'jeff')
self.assertEqual(objectchange.prechange_data['parameters']['password'], CENSOR_TOKEN)
self.assertEqual(objectchange.postchange_data['parameters']['username'], 'jeff')
self.assertEqual(objectchange.postchange_data['parameters']['password'], CENSOR_TOKEN)

# Remove the password
datasource.parameters['password'] = ''

objectchange = datasource.to_objectchange(ObjectChangeActionChoices.ACTION_UPDATE)
self.assertEqual(objectchange.prechange_data['parameters']['username'], 'jeff')
self.assertEqual(objectchange.prechange_data['parameters']['password'], CENSOR_TOKEN)
self.assertEqual(objectchange.postchange_data['parameters']['username'], 'jeff')
self.assertEqual(objectchange.postchange_data['parameters']['password'], '')

def test_password_not_modified(self):
datasource = DataSource.objects.create(
name='Data Source 1',
type='git',
source_url='http://localhost/',
parameters={
'username': 'username1',
'password': 'foobar123',
}
)
datasource.snapshot()

# Remove the password
datasource.parameters['username'] = 'username2'

objectchange = datasource.to_objectchange(ObjectChangeActionChoices.ACTION_UPDATE)
self.assertEqual(objectchange.prechange_data['parameters']['username'], 'username1')
self.assertEqual(objectchange.prechange_data['parameters']['password'], CENSOR_TOKEN)
self.assertEqual(objectchange.postchange_data['parameters']['username'], 'username2')
self.assertEqual(objectchange.postchange_data['parameters']['password'], CENSOR_TOKEN)
4 changes: 4 additions & 0 deletions netbox/netbox/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,7 @@
'bulk_edit': {'change'},
'bulk_delete': {'delete'},
}

# General-purpose tokens
CENSOR_TOKEN = '********'
CENSOR_TOKEN_CHANGED = '***CHANGED***'