Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 7 additions & 24 deletions netbox/extras/context_managers.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
from contextlib import contextmanager

from django.db.models.signals import m2m_changed, pre_delete, post_save

from extras.signals import clear_webhooks, clear_webhook_queue, handle_changed_object, handle_deleted_object
from netbox import thread_locals
from netbox.request_context import set_request
from netbox.context import current_request, webhooks_queue
from .webhooks import flush_webhooks


Expand All @@ -16,27 +12,14 @@ def change_logging(request):

:param request: WSGIRequest object with a unique `id` set
"""
set_request(request)
thread_locals.webhook_queue = []

# Connect our receivers to the post_save and post_delete signals.
post_save.connect(handle_changed_object, dispatch_uid='handle_changed_object')
m2m_changed.connect(handle_changed_object, dispatch_uid='handle_changed_object')
pre_delete.connect(handle_deleted_object, dispatch_uid='handle_deleted_object')
clear_webhooks.connect(clear_webhook_queue, dispatch_uid='clear_webhook_queue')
current_request.set(request)
webhooks_queue.set([])

yield

# Disconnect change logging signals. This is necessary to avoid recording any errant
# changes during test cleanup.
post_save.disconnect(handle_changed_object, dispatch_uid='handle_changed_object')
m2m_changed.disconnect(handle_changed_object, dispatch_uid='handle_changed_object')
pre_delete.disconnect(handle_deleted_object, dispatch_uid='handle_deleted_object')
clear_webhooks.disconnect(clear_webhook_queue, dispatch_uid='clear_webhook_queue')

# Flush queued webhooks to RQ
flush_webhooks(thread_locals.webhook_queue)
del thread_locals.webhook_queue
flush_webhooks(webhooks_queue.get())

# Clear the request from thread-local storage
set_request(None)
# Clear context vars
current_request.set(None)
webhooks_queue.set([])
61 changes: 38 additions & 23 deletions netbox/extras/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
from django_prometheus.models import model_deletes, model_inserts, model_updates

from extras.validators import CustomValidator
from netbox import thread_locals
from netbox.config import get_config
from netbox.request_context import get_request
from netbox.context import current_request, webhooks_queue
from netbox.signals import post_clean
from .choices import ObjectChangeActionChoices
from .models import ConfigRevision, CustomField, ObjectChange
from .webhooks import enqueue_object, get_snapshots, serialize_for_webhook


#
# Change logging/webhooks
#
Expand All @@ -23,22 +23,32 @@
clear_webhooks = Signal()


def is_same_object(instance, webhook_data, request_id):
"""
Compare the given instance to the most recent queued webhook object, returning True
if they match. This check is used to avoid creating duplicate webhook entries.
"""
return (
ContentType.objects.get_for_model(instance) == webhook_data['content_type'] and
instance.pk == webhook_data['object_id'] and
request_id == webhook_data['request_id']
)


@receiver((post_save, m2m_changed))
def handle_changed_object(sender, instance, **kwargs):
"""
Fires when an object is created or updated.
"""
m2m_changed = False

if not hasattr(instance, 'to_objectchange'):
return

request = get_request()
m2m_changed = False

def is_same_object(instance, webhook_data):
return (
ContentType.objects.get_for_model(instance) == webhook_data['content_type'] and
instance.pk == webhook_data['object_id'] and
request.id == webhook_data['request_id']
)
# Get the current request, or bail if not set
request = current_request.get()
if request is None:
return

# Determine the type of change being made
if kwargs.get('created'):
Expand Down Expand Up @@ -69,13 +79,14 @@ def is_same_object(instance, webhook_data):
objectchange.save()

# If this is an M2M change, update the previously queued webhook (from post_save)
webhook_queue = thread_locals.webhook_queue
if m2m_changed and webhook_queue and is_same_object(instance, webhook_queue[-1]):
queue = webhooks_queue.get()
if m2m_changed and queue and is_same_object(instance, queue[-1], request.id):
instance.refresh_from_db() # Ensure that we're working with fresh M2M assignments
webhook_queue[-1]['data'] = serialize_for_webhook(instance)
webhook_queue[-1]['snapshots']['postchange'] = get_snapshots(instance, action)['postchange']
queue[-1]['data'] = serialize_for_webhook(instance)
queue[-1]['snapshots']['postchange'] = get_snapshots(instance, action)['postchange']
else:
enqueue_object(webhook_queue, instance, request.user, request.id, action)
enqueue_object(queue, instance, request.user, request.id, action)
webhooks_queue.set(queue)

# Increment metric counters
if action == ObjectChangeActionChoices.ACTION_CREATE:
Expand All @@ -84,14 +95,18 @@ def is_same_object(instance, webhook_data):
model_updates.labels(instance._meta.model_name).inc()


@receiver(pre_delete)
def handle_deleted_object(sender, instance, **kwargs):
"""
Fires when an object is deleted.
"""
if not hasattr(instance, 'to_objectchange'):
return

request = get_request()
# Get the current request, or bail if not set
request = current_request.get()
if request is None:
return

# Record an ObjectChange if applicable
if hasattr(instance, 'to_objectchange'):
Expand All @@ -101,22 +116,22 @@ def handle_deleted_object(sender, instance, **kwargs):
objectchange.save()

# Enqueue webhooks
webhook_queue = thread_locals.webhook_queue
enqueue_object(webhook_queue, instance, request.user, request.id, ObjectChangeActionChoices.ACTION_DELETE)
queue = webhooks_queue.get()
enqueue_object(queue, instance, request.user, request.id, ObjectChangeActionChoices.ACTION_DELETE)
webhooks_queue.set(queue)

# Increment metric counters
model_deletes.labels(instance._meta.model_name).inc()


@receiver(clear_webhooks)
def clear_webhook_queue(sender, **kwargs):
"""
Delete any queued webhooks (e.g. because of an aborted bulk transaction)
"""
logger = logging.getLogger('webhooks')
webhook_queue = thread_locals.webhook_queue

logger.info(f"Clearing {len(webhook_queue)} queued webhooks ({sender})")
webhook_queue.clear()
logger.info(f"Clearing {len(webhooks_queue.get())} queued webhooks ({sender})")
webhooks_queue.set([])


#
Expand Down
3 changes: 0 additions & 3 deletions netbox/netbox/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +0,0 @@
import threading

thread_locals = threading.local()
10 changes: 10 additions & 0 deletions netbox/netbox/context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from contextvars import ContextVar

__all__ = (
'current_request',
'webhooks_queue',
)


current_request = ContextVar('current_request', default=None)
webhooks_queue = ContextVar('webhooks_queue')
9 changes: 0 additions & 9 deletions netbox/netbox/request_context.py

This file was deleted.