diff --git a/netbox/extras/context_managers.py b/netbox/extras/context_managers.py index 9f73fe9c39e..32323999efe 100644 --- a/netbox/extras/context_managers.py +++ b/netbox/extras/context_managers.py @@ -1,10 +1,6 @@ from contextlib import contextmanager -from django.db.models.signals import m2m_changed, pre_delete, post_save - -from extras.signals import clear_webhooks, clear_webhook_queue, handle_changed_object, handle_deleted_object -from netbox import thread_locals -from netbox.request_context import set_request +from netbox.context import current_request, webhooks_queue from .webhooks import flush_webhooks @@ -16,27 +12,14 @@ def change_logging(request): :param request: WSGIRequest object with a unique `id` set """ - set_request(request) - thread_locals.webhook_queue = [] - - # Connect our receivers to the post_save and post_delete signals. - post_save.connect(handle_changed_object, dispatch_uid='handle_changed_object') - m2m_changed.connect(handle_changed_object, dispatch_uid='handle_changed_object') - pre_delete.connect(handle_deleted_object, dispatch_uid='handle_deleted_object') - clear_webhooks.connect(clear_webhook_queue, dispatch_uid='clear_webhook_queue') + current_request.set(request) + webhooks_queue.set([]) yield - # Disconnect change logging signals. This is necessary to avoid recording any errant - # changes during test cleanup. - post_save.disconnect(handle_changed_object, dispatch_uid='handle_changed_object') - m2m_changed.disconnect(handle_changed_object, dispatch_uid='handle_changed_object') - pre_delete.disconnect(handle_deleted_object, dispatch_uid='handle_deleted_object') - clear_webhooks.disconnect(clear_webhook_queue, dispatch_uid='clear_webhook_queue') - # Flush queued webhooks to RQ - flush_webhooks(thread_locals.webhook_queue) - del thread_locals.webhook_queue + flush_webhooks(webhooks_queue.get()) - # Clear the request from thread-local storage - set_request(None) + # Clear context vars + current_request.set(None) + webhooks_queue.set([]) diff --git a/netbox/extras/signals.py b/netbox/extras/signals.py index aff350cc4f3..31e0c126cf7 100644 --- a/netbox/extras/signals.py +++ b/netbox/extras/signals.py @@ -7,14 +7,14 @@ from django_prometheus.models import model_deletes, model_inserts, model_updates from extras.validators import CustomValidator -from netbox import thread_locals from netbox.config import get_config -from netbox.request_context import get_request +from netbox.context import current_request, webhooks_queue from netbox.signals import post_clean from .choices import ObjectChangeActionChoices from .models import ConfigRevision, CustomField, ObjectChange from .webhooks import enqueue_object, get_snapshots, serialize_for_webhook + # # Change logging/webhooks # @@ -23,22 +23,32 @@ clear_webhooks = Signal() +def is_same_object(instance, webhook_data, request_id): + """ + Compare the given instance to the most recent queued webhook object, returning True + if they match. This check is used to avoid creating duplicate webhook entries. + """ + return ( + ContentType.objects.get_for_model(instance) == webhook_data['content_type'] and + instance.pk == webhook_data['object_id'] and + request_id == webhook_data['request_id'] + ) + + +@receiver((post_save, m2m_changed)) def handle_changed_object(sender, instance, **kwargs): """ Fires when an object is created or updated. """ + m2m_changed = False + if not hasattr(instance, 'to_objectchange'): return - request = get_request() - m2m_changed = False - - def is_same_object(instance, webhook_data): - return ( - ContentType.objects.get_for_model(instance) == webhook_data['content_type'] and - instance.pk == webhook_data['object_id'] and - request.id == webhook_data['request_id'] - ) + # Get the current request, or bail if not set + request = current_request.get() + if request is None: + return # Determine the type of change being made if kwargs.get('created'): @@ -69,13 +79,14 @@ def is_same_object(instance, webhook_data): objectchange.save() # If this is an M2M change, update the previously queued webhook (from post_save) - webhook_queue = thread_locals.webhook_queue - if m2m_changed and webhook_queue and is_same_object(instance, webhook_queue[-1]): + queue = webhooks_queue.get() + if m2m_changed and queue and is_same_object(instance, queue[-1], request.id): instance.refresh_from_db() # Ensure that we're working with fresh M2M assignments - webhook_queue[-1]['data'] = serialize_for_webhook(instance) - webhook_queue[-1]['snapshots']['postchange'] = get_snapshots(instance, action)['postchange'] + queue[-1]['data'] = serialize_for_webhook(instance) + queue[-1]['snapshots']['postchange'] = get_snapshots(instance, action)['postchange'] else: - enqueue_object(webhook_queue, instance, request.user, request.id, action) + enqueue_object(queue, instance, request.user, request.id, action) + webhooks_queue.set(queue) # Increment metric counters if action == ObjectChangeActionChoices.ACTION_CREATE: @@ -84,6 +95,7 @@ def is_same_object(instance, webhook_data): model_updates.labels(instance._meta.model_name).inc() +@receiver(pre_delete) def handle_deleted_object(sender, instance, **kwargs): """ Fires when an object is deleted. @@ -91,7 +103,10 @@ def handle_deleted_object(sender, instance, **kwargs): if not hasattr(instance, 'to_objectchange'): return - request = get_request() + # Get the current request, or bail if not set + request = current_request.get() + if request is None: + return # Record an ObjectChange if applicable if hasattr(instance, 'to_objectchange'): @@ -101,22 +116,22 @@ def handle_deleted_object(sender, instance, **kwargs): objectchange.save() # Enqueue webhooks - webhook_queue = thread_locals.webhook_queue - enqueue_object(webhook_queue, instance, request.user, request.id, ObjectChangeActionChoices.ACTION_DELETE) + queue = webhooks_queue.get() + enqueue_object(queue, instance, request.user, request.id, ObjectChangeActionChoices.ACTION_DELETE) + webhooks_queue.set(queue) # Increment metric counters model_deletes.labels(instance._meta.model_name).inc() +@receiver(clear_webhooks) def clear_webhook_queue(sender, **kwargs): """ Delete any queued webhooks (e.g. because of an aborted bulk transaction) """ logger = logging.getLogger('webhooks') - webhook_queue = thread_locals.webhook_queue - - logger.info(f"Clearing {len(webhook_queue)} queued webhooks ({sender})") - webhook_queue.clear() + logger.info(f"Clearing {len(webhooks_queue.get())} queued webhooks ({sender})") + webhooks_queue.set([]) # diff --git a/netbox/netbox/__init__.py b/netbox/netbox/__init__.py index 5cf43102523..e69de29bb2d 100644 --- a/netbox/netbox/__init__.py +++ b/netbox/netbox/__init__.py @@ -1,3 +0,0 @@ -import threading - -thread_locals = threading.local() diff --git a/netbox/netbox/context.py b/netbox/netbox/context.py new file mode 100644 index 00000000000..b5e4dc28ed4 --- /dev/null +++ b/netbox/netbox/context.py @@ -0,0 +1,10 @@ +from contextvars import ContextVar + +__all__ = ( + 'current_request', + 'webhooks_queue', +) + + +current_request = ContextVar('current_request', default=None) +webhooks_queue = ContextVar('webhooks_queue') diff --git a/netbox/netbox/request_context.py b/netbox/netbox/request_context.py deleted file mode 100644 index 41e8283e8c5..00000000000 --- a/netbox/netbox/request_context.py +++ /dev/null @@ -1,9 +0,0 @@ -from netbox import thread_locals - - -def set_request(request): - thread_locals.request = request - - -def get_request(): - return getattr(thread_locals, 'request', None)