Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
a1ee613
test cases: fix caching of system settings
valentijnscholten Jul 26, 2025
811f37a
fix tests
valentijnscholten Jul 27, 2025
a9b04a3
fix caching for github
valentijnscholten Jul 27, 2025
5ee9aeb
fix caching for github
valentijnscholten Jul 27, 2025
63badef
simplify cache loading
valentijnscholten Jul 27, 2025
e99e7d4
post process only when needed
valentijnscholten Jul 27, 2025
348b69c
set tags on (re)import
valentijnscholten Jul 27, 2025
7a9326d
rebase set tags
valentijnscholten Jul 27, 2025
d9c4cc4
reduce save with options
valentijnscholten Jul 27, 2025
e717d8f
update counts, reduce saves with options
valentijnscholten Jul 27, 2025
eda6959
importers: do not save again, but postprocess directly
valentijnscholten Jul 27, 2025
395ac9e
update counts
valentijnscholten Jul 27, 2025
30ad08b
optimize hash_code setting
valentijnscholten Jul 27, 2025
551f153
fix counts
valentijnscholten Jul 27, 2025
a285f32
set hash code for new findings in reimport
valentijnscholten Jul 27, 2025
7767089
make smaller second save work
valentijnscholten Aug 1, 2025
8466ed8
make smaller second save work - add no_options
valentijnscholten Aug 1, 2025
aec055f
update query counts
valentijnscholten Aug 1, 2025
7029cce
update counts
valentijnscholten Aug 1, 2025
739c844
remove logging
valentijnscholten Aug 1, 2025
f2aa5b2
perf3b: compute hash_code on first save
valentijnscholten Aug 1, 2025
44bbb4c
fix cve for reimport
valentijnscholten Aug 1, 2025
3cb5daf
ruff
valentijnscholten Aug 1, 2025
ef64887
fix no async
valentijnscholten Aug 3, 2025
10a82ba
Merge remote-tracking branch 'upstream/dev' into perf3-reduce-saves
valentijnscholten Aug 4, 2025
5504c8d
make smaller second save work
valentijnscholten Aug 1, 2025
f48b55f
fix cve for reimport
valentijnscholten Aug 1, 2025
c714da8
initial
valentijnscholten Aug 3, 2025
2d35637
fix counts
valentijnscholten Aug 3, 2025
6507a93
fix counts
valentijnscholten Aug 3, 2025
e2194ab
simplify
valentijnscholten Aug 3, 2025
72e95c3
simplify
valentijnscholten Aug 4, 2025
cb2d0e3
refactor to await results
valentijnscholten Sep 13, 2025
b9b4815
handle reimport and close old findings
valentijnscholten Sep 13, 2025
26295cb
update query and task counts
valentijnscholten Sep 13, 2025
87e5d45
switch back to chords
valentijnscholten Sep 14, 2025
930792d
simplify
valentijnscholten Sep 14, 2025
baac171
respect system settings product grading enabled
valentijnscholten Sep 14, 2025
fdcd1ec
finding/test delete grading only if enabled
valentijnscholten Sep 14, 2025
a1f0005
optimize asyn_dupe_delete grading
valentijnscholten Sep 14, 2025
bd99783
cleanup comments
valentijnscholten Sep 14, 2025
29de952
fix merge artifact
valentijnscholten Sep 15, 2025
e303a48
Merge remote-tracking branch 'upstream/dev' into perf4-chord-grade
valentijnscholten Sep 17, 2025
0ec0a3d
Merge remote-tracking branch 'upstream/dev' into perf4-chord-grade
valentijnscholten Sep 17, 2025
b64f34b
fix loop
valentijnscholten Sep 17, 2025
98ef20d
simplify loop
valentijnscholten Sep 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 28 additions & 19 deletions dojo/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,25 +79,34 @@ def we_want_async(*args, func=None, **kwargs):

# Defect Dojo performs all tasks asynchrnonously using celery
# *unless* the user initiating the task has set block_execution to True in their usercontactinfo profile
def dojo_async_task(func):
@wraps(func)
def __wrapper__(*args, **kwargs):
from dojo.utils import get_current_user # noqa: PLC0415 circular import
user = get_current_user()
kwargs["async_user"] = user

dojo_async_task_counter.incr(
func.__name__,
args=args,
kwargs=kwargs,
)

countdown = kwargs.pop("countdown", 0)
if we_want_async(*args, func=func, **kwargs):
return func.apply_async(args=args, kwargs=kwargs, countdown=countdown)
return func(*args, **kwargs)

return __wrapper__
def dojo_async_task(func=None, *, signature=False):
def decorator(func):
@wraps(func)
def __wrapper__(*args, **kwargs):
from dojo.utils import get_current_user # noqa: PLC0415 circular import
user = get_current_user()
kwargs["async_user"] = user

dojo_async_task_counter.incr(
func.__name__,
args=args,
kwargs=kwargs,
)

if signature:
return func.si(*args, **kwargs)

countdown = kwargs.pop("countdown", 0)
if we_want_async(*args, func=func, **kwargs):
# Return a signature for use in chord/group if requested
# Execute the task
return func.apply_async(args=args, kwargs=kwargs, countdown=countdown)
return func(*args, **kwargs)
return __wrapper__

if func is None:
return decorator
return decorator(func)


# decorator with parameters needs another wrapper layer
Expand Down
23 changes: 22 additions & 1 deletion dojo/finding/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,13 +357,35 @@ def add_findings_to_auto_group(name, findings, group_by, *, create_finding_group
finding_group.findings.add(*findings)


@dojo_model_to_id
@dojo_async_task(signature=True)
@app.task
@dojo_model_from_id
def post_process_finding_save_signature(finding, dedupe_option=True, rules_option=True, product_grading_option=True, # noqa: FBT002
issue_updater_option=True, push_to_jira=False, user=None, *args, **kwargs): # noqa: FBT002 - this is bit hard to fix nice have this universally fixed
"""
Returns a task signature for post-processing a finding. This is useful for creating task signatures
that can be used in chords or groups or to await results. We need this extra method because of our dojo_async decorator.
If we use more of these celery features, we should probably move away from that decorator.
"""
return post_process_finding_save_internal(finding, dedupe_option, rules_option, product_grading_option,
issue_updater_option, push_to_jira, user, *args, **kwargs)


@dojo_model_to_id
@dojo_async_task
@app.task
@dojo_model_from_id
def post_process_finding_save(finding, dedupe_option=True, rules_option=True, product_grading_option=True, # noqa: FBT002
issue_updater_option=True, push_to_jira=False, user=None, *args, **kwargs): # noqa: FBT002 - this is bit hard to fix nice have this universally fixed

return post_process_finding_save_internal(finding, dedupe_option, rules_option, product_grading_option,
issue_updater_option, push_to_jira, user, *args, **kwargs)


def post_process_finding_save_internal(finding, dedupe_option=True, rules_option=True, product_grading_option=True, # noqa: FBT002
issue_updater_option=True, push_to_jira=False, user=None, *args, **kwargs): # noqa: FBT002 - this is bit hard to fix nice have this universally fixed

if not finding:
logger.warning("post_process_finding_save called with finding==None, skipping post processing")
return
Expand Down Expand Up @@ -455,7 +477,6 @@ def finding_post_delete(sender, instance, **kwargs):
# Catch instances in async delete where a single object is deleted more than once
with suppress(Finding.DoesNotExist):
logger.debug("finding post_delete, sender: %s instance: %s", to_str_typed(sender), to_str_typed(instance))
# calculate_grade(instance.test.engagement.product)


def reset_duplicate_before_delete(dupe):
Expand Down
5 changes: 3 additions & 2 deletions dojo/importers/base_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,7 @@ def mitigate_finding(
note_message: str,
*,
finding_groups_enabled: bool,
product_grading_option: bool = True,
) -> None:
"""
Mitigates a finding, all endpoint statuses, leaves a note on the finding
Expand All @@ -764,9 +765,9 @@ def mitigate_finding(
# to avoid pushing a finding group multiple times, we push those outside of the loop
if finding_groups_enabled and finding.finding_group:
# don't try to dedupe findings that we are closing
finding.save(dedupe_option=False)
finding.save(dedupe_option=False, product_grading_option=product_grading_option)
else:
finding.save(dedupe_option=False, push_to_jira=self.push_to_jira)
finding.save(dedupe_option=False, push_to_jira=self.push_to_jira, product_grading_option=product_grading_option)

def notify_scan_added(
self,
Expand Down
90 changes: 79 additions & 11 deletions dojo/importers/default_importer.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
import logging

from celery import chord, group
from django.core.files.uploadedfile import TemporaryUploadedFile
from django.core.serializers import serialize
from django.db.models.query_utils import Q
from django.urls import reverse

import dojo.finding.helper as finding_helper
import dojo.jira_link.helper as jira_helper
from dojo import utils
from dojo.decorators import we_want_async
from dojo.finding import helper as finding_helper
from dojo.importers.base_importer import BaseImporter, Parser
from dojo.importers.options import ImporterOptions
from dojo.models import (
Engagement,
Finding,
System_Settings,
Test,
Test_Import,
)
from dojo.notifications.helper import create_notification
from dojo.utils import calculate_grade
from dojo.validators import clean_tags

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -155,6 +160,11 @@ def process_findings(
parsed_findings: list[Finding],
**kwargs: dict,
) -> list[Finding]:
# Progressive batching for chord execution
post_processing_task_signatures = []
current_batch_number = 1
max_batch_size = 1024

"""
Saves findings in memory that were parsed from the scan report into the database.
This process involves first saving associated objects such as endpoints, files,
Expand All @@ -166,13 +176,17 @@ def process_findings(
logger.debug("starting import of %i parsed findings.", len(parsed_findings) if parsed_findings else 0)
group_names_to_findings_dict = {}

for non_clean_unsaved_finding in parsed_findings:
# make sure the severity is something is digestible
unsaved_finding = self.sanitize_severity(non_clean_unsaved_finding)
# Filter on minimum severity if applicable
if Finding.SEVERITIES[unsaved_finding.severity] > Finding.SEVERITIES[self.minimum_severity]:
# finding's severity is below the configured threshold : ignoring the finding
# Pre-sanitize and filter by minimum severity
cleaned_findings = []
for raw_finding in parsed_findings or []:
sanitized = self.sanitize_severity(raw_finding)
if Finding.SEVERITIES[sanitized.severity] > Finding.SEVERITIES[self.minimum_severity]:
logger.debug("skipping finding due to minimum severity filter (finding=%s severity=%s min=%s)", sanitized.title, sanitized.severity, self.minimum_severity)
continue
cleaned_findings.append(sanitized)

for idx, unsaved_finding in enumerate(cleaned_findings):
is_final_finding = idx == len(cleaned_findings) - 1

# Some parsers provide "mitigated" field but do not set timezone (because they are probably not available in the report)
# Finding.mitigated is DateTimeField and it requires timezone
Expand All @@ -183,7 +197,7 @@ def process_findings(
unsaved_finding.reporter = self.user
unsaved_finding.last_reviewed_by = self.user
unsaved_finding.last_reviewed = self.now
logger.debug("process_parsed_findings: unique_id_from_tool: %s, hash_code: %s, active from report: %s, verified from report: %s", unsaved_finding.unique_id_from_tool, unsaved_finding.hash_code, unsaved_finding.active, unsaved_finding.verified)
logger.debug("process_parsed_finding: unique_id_from_tool: %s, hash_code: %s, active from report: %s, verified from report: %s", unsaved_finding.unique_id_from_tool, unsaved_finding.hash_code, unsaved_finding.active, unsaved_finding.verified)
# indicates an override. Otherwise, do not change the value of unsaved_finding.active
if self.active is not None:
unsaved_finding.active = self.active
Expand All @@ -205,7 +219,6 @@ def process_findings(
# postprocessing will be done after processing related fields like endpoints, vulnerability ids, etc.
unsaved_finding.save_no_options()

finding = unsaved_finding
# Determine how the finding should be grouped
self.process_finding_groups(
finding,
Expand All @@ -225,9 +238,48 @@ def process_findings(
new_findings.append(finding)
# all data is already saved on the finding, we only need to trigger post processing

# to avoid pushing a finding group multiple times, we push those outside of the loop
# We create a signature for the post processing task so we can decide to apply it async or sync
push_to_jira = self.push_to_jira and (not self.findings_groups_enabled or not self.group_by)
finding_helper.post_process_finding_save(finding, dedupe_option=True, rules_option=True, product_grading_option=True, issue_updater_option=True, push_to_jira=push_to_jira)
post_processing_task_signature = finding_helper.post_process_finding_save_signature(
finding,
dedupe_option=True,
rules_option=True,
product_grading_option=False,
issue_updater_option=True,
push_to_jira=push_to_jira,
)

post_processing_task_signatures.append(post_processing_task_signature)

# Check if we should launch a chord (batch full or end of findings)
if we_want_async(async_user=self.user) and post_processing_task_signatures:
# Calculate current batch size: 2^batch_number, capped at max_batch_size
# We do this because post processing only starts after all tasks have been added to the chord
# So we start with small batches to minmize the delay
current_batch_size = min(2 ** current_batch_number, max_batch_size)

batch_full = len(post_processing_task_signatures) >= current_batch_size

if batch_full or is_final_finding:
# Launch chord with current batch of signatures
product = self.test.engagement.product
system_settings = System_Settings.objects.get()
if system_settings.enable_product_grade:
calculate_grade_signature = utils.calculate_grade_signature(product)
chord(post_processing_task_signatures)(calculate_grade_signature)
elif post_processing_task_signatures:
# If product grading is disabled, just run the post-processing tasks without the grade calculation callback
group(post_processing_task_signatures).apply_async()

logger.debug(f"Launched chord with {len(post_processing_task_signatures)} tasks (batch #{current_batch_number}, size: {len(post_processing_task_signatures)})")

# Reset for next batch (only if not final)
post_processing_task_signatures = []
if not is_final_finding:
current_batch_number += 1
else:
# Execute task immediately for synchronous processing
post_processing_task_signature()

for (group_name, findings) in group_names_to_findings_dict.items():
finding_helper.add_findings_to_auto_group(
Expand All @@ -243,6 +295,14 @@ def process_findings(
else:
jira_helper.push_to_jira(findings[0])

# Note: All chord batching is now handled within the loop above

# Always perform an initial grading, even though it might get overwritten later.
product = self.test.engagement.product
system_settings = System_Settings.objects.get()
if system_settings.enable_product_grade:
calculate_grade(product)

sync = kwargs.get("sync", True)
if not sync:
return [serialize("json", [finding]) for finding in new_findings]
Expand Down Expand Up @@ -320,12 +380,20 @@ def close_old_findings(
"as it is not present anymore in recent scans."
),
finding_groups_enabled=self.findings_groups_enabled,
product_grading_option=False,
)
# push finding groups to jira since we only only want to push whole groups
if self.findings_groups_enabled and self.push_to_jira:
for finding_group in {finding.finding_group for finding in old_findings if finding.finding_group is not None}:
jira_helper.push_to_jira(finding_group)

# Calculate grade once after all findings have been closed
if old_findings:
product = self.test.engagement.product
system_settings = System_Settings.objects.get()
if system_settings.enable_product_grade:
calculate_grade(product)

return old_findings

def parse_findings_static_test_type(
Expand Down
Loading
Loading