Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/release-notes/version-2.11.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
### Bug Fixes

* [#6064](https://github.com/netbox-community/netbox/issues/6064) - Fix object permission assignments for user and group models
* [#6284](https://github.com/netbox-community/netbox/issues/6284) - Avoid sending redundant webhooks when adding/removing tags
* [#6496](https://github.com/netbox-community/netbox/issues/6496) - Fix upgrade script when Python installed in nonstandard path
* [#6502](https://github.com/netbox-community/netbox/issues/6502) - Correct permissions evaluation for running a report via the REST API

Expand Down
11 changes: 9 additions & 2 deletions netbox/extras/context_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from extras.signals import _handle_changed_object, _handle_deleted_object
from utilities.utils import curry
from .webhooks import flush_webhooks


@contextmanager
Expand All @@ -14,9 +15,11 @@ def change_logging(request):

:param request: WSGIRequest object with a unique `id` set
"""
webhook_queue = []

# Curry signals receivers to pass the current request
handle_changed_object = curry(_handle_changed_object, request)
handle_deleted_object = curry(_handle_deleted_object, request)
handle_changed_object = curry(_handle_changed_object, request, webhook_queue)
handle_deleted_object = curry(_handle_deleted_object, request, webhook_queue)

# Connect our receivers to the post_save and post_delete signals.
post_save.connect(handle_changed_object, dispatch_uid='handle_changed_object')
Expand All @@ -30,3 +33,7 @@ def change_logging(request):
post_save.disconnect(handle_changed_object, dispatch_uid='handle_changed_object')
m2m_changed.disconnect(handle_changed_object, dispatch_uid='handle_changed_object')
pre_delete.disconnect(handle_deleted_object, dispatch_uid='handle_deleted_object')

# Flush queued webhooks to RQ
flush_webhooks(webhook_queue)
del webhook_queue
30 changes: 24 additions & 6 deletions netbox/extras/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,27 @@

from .choices import ObjectChangeActionChoices
from .models import CustomField, ObjectChange
from .webhooks import enqueue_webhooks
from .webhooks import enqueue_object, get_snapshots, serialize_for_webhook


#
# Change logging/webhooks
#

def _handle_changed_object(request, sender, instance, **kwargs):
def _handle_changed_object(request, webhook_queue, sender, instance, **kwargs):
"""
Fires when an object is created or updated.
"""
def is_same_object(instance, webhook_data):
return (
ContentType.objects.get_for_model(instance) == webhook_data['content_type'] and
instance.pk == webhook_data['object_id'] and
request.id == webhook_data['request_id']
)

if not hasattr(instance, 'to_objectchange'):
return

m2m_changed = False

# Determine the type of change being made
Expand Down Expand Up @@ -53,8 +63,13 @@ def _handle_changed_object(request, sender, instance, **kwargs):
objectchange.request_id = request.id
objectchange.save()

# Enqueue webhooks
enqueue_webhooks(instance, request.user, request.id, action)
# If this is an M2M change, update the previously queued webhook (from post_save)
if m2m_changed and webhook_queue and is_same_object(instance, webhook_queue[-1]):
instance.refresh_from_db() # Ensure that we're working with fresh M2M assignments
webhook_queue[-1]['data'] = serialize_for_webhook(instance)
webhook_queue[-1]['snapshots']['postchange'] = get_snapshots(instance, action)['postchange']
else:
enqueue_object(webhook_queue, instance, request.user, request.id, action)

# Increment metric counters
if action == ObjectChangeActionChoices.ACTION_CREATE:
Expand All @@ -68,10 +83,13 @@ def _handle_changed_object(request, sender, instance, **kwargs):
ObjectChange.objects.filter(time__lt=cutoff)._raw_delete(using=DEFAULT_DB_ALIAS)


def _handle_deleted_object(request, sender, instance, **kwargs):
def _handle_deleted_object(request, webhook_queue, sender, instance, **kwargs):
"""
Fires when an object is deleted.
"""
if not hasattr(instance, 'to_objectchange'):
return

# Record an ObjectChange if applicable
if hasattr(instance, 'to_objectchange'):
objectchange = instance.to_objectchange(ObjectChangeActionChoices.ACTION_DELETE)
Expand All @@ -80,7 +98,7 @@ def _handle_deleted_object(request, sender, instance, **kwargs):
objectchange.save()

# Enqueue webhooks
enqueue_webhooks(instance, request.user, request.id, ObjectChangeActionChoices.ACTION_DELETE)
enqueue_object(webhook_queue, instance, request.user, request.id, ObjectChangeActionChoices.ACTION_DELETE)

# Increment metric counters
model_deletes.labels(instance._meta.model_name).inc()
Expand Down
194 changes: 176 additions & 18 deletions netbox/extras/tests/test_webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,19 @@

from dcim.models import Site
from extras.choices import ObjectChangeActionChoices
from extras.models import Webhook
from extras.webhooks import enqueue_webhooks, generate_signature
from extras.models import Tag, Webhook
from extras.webhooks import enqueue_object, flush_webhooks, generate_signature
from extras.webhooks_worker import process_webhook
from utilities.testing import APITestCase


class WebhookTest(APITestCase):

def setUp(self):

super().setUp()

self.queue = django_rq.get_queue('default')
self.queue.empty() # Begin each test with an empty queue
self.queue.empty()

@classmethod
def setUpTestData(cls):
Expand All @@ -34,38 +33,104 @@ def setUpTestData(cls):
DUMMY_SECRET = "LOOKATMEIMASECRETSTRING"

webhooks = Webhook.objects.bulk_create((
Webhook(name='Site Create Webhook', type_create=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET, additional_headers='X-Foo: Bar'),
Webhook(name='Site Update Webhook', type_update=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET),
Webhook(name='Site Delete Webhook', type_delete=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET),
Webhook(name='Webhook 1', type_create=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET, additional_headers='X-Foo: Bar'),
Webhook(name='Webhook 2', type_update=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET),
Webhook(name='Webhook 3', type_delete=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET),
))
for webhook in webhooks:
webhook.content_types.set([site_ct])

Tag.objects.bulk_create((
Tag(name='Foo', slug='foo'),
Tag(name='Bar', slug='bar'),
Tag(name='Baz', slug='baz'),
))

def test_enqueue_webhook_create(self):
# Create an object via the REST API
data = {
'name': 'Test Site',
'slug': 'test-site',
'name': 'Site 1',
'slug': 'site-1',
'tags': [
{'name': 'Foo'},
{'name': 'Bar'},
]
}
url = reverse('dcim-api:site-list')
self.add_permissions('dcim.add_site')
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(Site.objects.count(), 1)
self.assertEqual(Site.objects.first().tags.count(), 2)

# Verify that a job was queued for the object creation webhook
self.assertEqual(self.queue.count, 1)
job = self.queue.jobs[0]
self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_create=True))
self.assertEqual(job.kwargs['data']['id'], response.data['id'])
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_CREATE)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], response.data['id'])
self.assertEqual(len(job.kwargs['data']['tags']), len(response.data['tags']))
self.assertEqual(job.kwargs['snapshots']['postchange']['name'], 'Site 1')
self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Bar', 'Foo'])

def test_enqueue_webhook_bulk_create(self):
# Create multiple objects via the REST API
data = [
{
'name': 'Site 1',
'slug': 'site-1',
'tags': [
{'name': 'Foo'},
{'name': 'Bar'},
]
},
{
'name': 'Site 2',
'slug': 'site-2',
'tags': [
{'name': 'Foo'},
{'name': 'Bar'},
]
},
{
'name': 'Site 3',
'slug': 'site-3',
'tags': [
{'name': 'Foo'},
{'name': 'Bar'},
]
},
]
url = reverse('dcim-api:site-list')
self.add_permissions('dcim.add_site')
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(Site.objects.count(), 3)
self.assertEqual(Site.objects.first().tags.count(), 2)

# Verify that a webhook was queued for each object
self.assertEqual(self.queue.count, 3)
for i, job in enumerate(self.queue.jobs):
self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_create=True))
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_CREATE)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], response.data[i]['id'])
self.assertEqual(len(job.kwargs['data']['tags']), len(response.data[i]['tags']))
self.assertEqual(job.kwargs['snapshots']['postchange']['name'], response.data[i]['name'])
self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Bar', 'Foo'])

def test_enqueue_webhook_update(self):
# Update an object via the REST API
site = Site.objects.create(name='Site 1', slug='site-1')
site.tags.set(*Tag.objects.filter(name__in=['Foo', 'Bar']))

# Update an object via the REST API
data = {
'name': 'Site X',
'comments': 'Updated the site',
'tags': [
{'name': 'Baz'}
]
}
url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk})
self.add_permissions('dcim.change_site')
Expand All @@ -76,13 +141,72 @@ def test_enqueue_webhook_update(self):
self.assertEqual(self.queue.count, 1)
job = self.queue.jobs[0]
self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_update=True))
self.assertEqual(job.kwargs['data']['id'], site.pk)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_UPDATE)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], site.pk)
self.assertEqual(len(job.kwargs['data']['tags']), len(response.data['tags']))
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], 'Site 1')
self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
self.assertEqual(job.kwargs['snapshots']['postchange']['name'], 'Site X')
self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Baz'])

def test_enqueue_webhook_bulk_update(self):
sites = (
Site(name='Site 1', slug='site-1'),
Site(name='Site 2', slug='site-2'),
Site(name='Site 3', slug='site-3'),
)
Site.objects.bulk_create(sites)
for site in sites:
site.tags.set(*Tag.objects.filter(name__in=['Foo', 'Bar']))

# Update three objects via the REST API
data = [
{
'id': sites[0].pk,
'name': 'Site X',
'tags': [
{'name': 'Baz'}
]
},
{
'id': sites[1].pk,
'name': 'Site Y',
'tags': [
{'name': 'Baz'}
]
},
{
'id': sites[2].pk,
'name': 'Site Z',
'tags': [
{'name': 'Baz'}
]
},
]
url = reverse('dcim-api:site-list')
self.add_permissions('dcim.change_site')
response = self.client.patch(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)

# Verify that a job was queued for the object update webhook
self.assertEqual(self.queue.count, 3)
for i, job in enumerate(self.queue.jobs):
self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_update=True))
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_UPDATE)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], data[i]['id'])
self.assertEqual(len(job.kwargs['data']['tags']), len(response.data[i]['tags']))
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], sites[i].name)
self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
self.assertEqual(job.kwargs['snapshots']['postchange']['name'], response.data[i]['name'])
self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Baz'])

def test_enqueue_webhook_delete(self):
# Delete an object via the REST API
site = Site.objects.create(name='Site 1', slug='site-1')
site.tags.set(*Tag.objects.filter(name__in=['Foo', 'Bar']))

# Delete an object via the REST API
url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk})
self.add_permissions('dcim.delete_site')
response = self.client.delete(url, **self.header)
Expand All @@ -92,9 +216,40 @@ def test_enqueue_webhook_delete(self):
self.assertEqual(self.queue.count, 1)
job = self.queue.jobs[0]
self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_delete=True))
self.assertEqual(job.kwargs['data']['id'], site.pk)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_DELETE)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], site.pk)
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], 'Site 1')
self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])

def test_enqueue_webhook_bulk_delete(self):
sites = (
Site(name='Site 1', slug='site-1'),
Site(name='Site 2', slug='site-2'),
Site(name='Site 3', slug='site-3'),
)
Site.objects.bulk_create(sites)
for site in sites:
site.tags.set(*Tag.objects.filter(name__in=['Foo', 'Bar']))

# Delete three objects via the REST API
data = [
{'id': site.pk} for site in sites
]
url = reverse('dcim-api:site-list')
self.add_permissions('dcim.delete_site')
response = self.client.delete(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_204_NO_CONTENT)

# Verify that a job was queued for the object update webhook
self.assertEqual(self.queue.count, 3)
for i, job in enumerate(self.queue.jobs):
self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_delete=True))
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_DELETE)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], sites[i].pk)
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], sites[i].name)
self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])

def test_webhooks_worker(self):

Expand Down Expand Up @@ -125,13 +280,16 @@ def dummy_send(_, request, **kwargs):
return HttpResponse()

# Enqueue a webhook for processing
webhooks_queue = []
site = Site.objects.create(name='Site 1', slug='site-1')
enqueue_webhooks(
enqueue_object(
webhooks_queue,
instance=site,
user=self.user,
request_id=request_id,
action=ObjectChangeActionChoices.ACTION_CREATE
)
flush_webhooks(webhooks_queue)

# Retrieve the job from queue
job = self.queue.jobs[0]
Expand Down
Loading