Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Oct 31, 2025

📄 143% (1.43x) speedup for ConfluenceDataSource.disable_admin_key in backend/python/app/sources/external/confluence/confluence.py

⏱️ Runtime : 6.37 milliseconds 2.62 milliseconds (best of 226 runs)

📝 Explanation and details

The optimized code achieves a 143% speedup (2.62ms vs 6.37ms) and 16.5% throughput improvement through several key optimizations that eliminate repeated computation overhead:

Key Optimizations:

  1. Module-Level Class Definition: Moved _SafeDict class from inside _safe_format_url to module scope. The line profiler shows this eliminated 4.66ms of overhead (88.4% of _safe_format_url time) by avoiding class redefinition on every call.

  2. Cached Empty Dictionary: Introduced _EMPTY_STR_DICT constant to avoid repeatedly calling _as_str_dict() on empty dictionaries. Since _path and _query are always empty, this eliminates ~5ms of unnecessary work per call.

  3. Optimized Header Processing: Replaced dict(headers or {}) with direct conditional assignment headers if headers is not None else {}, avoiding dict constructor overhead.

  4. Fast-Path for Empty Dictionaries: Added early return in _as_str_dict() for empty inputs, reducing calls from 3,066 to 274 in the profiler results.

  5. Efficient String Concatenation: Changed self.base_url + _safe_format_url(...) to f-string format for better performance.

Performance Impact by Test Type:

  • High-volume concurrent tests (100-200 operations) see the biggest gains from reduced per-call overhead
  • Repeated calls with empty headers benefit most from the cached empty dictionary optimization
  • All test scenarios benefit from the eliminated class redefinition overhead

The optimizations are particularly effective for this API endpoint pattern where path/query parameters are consistently empty, making the caching strategy highly beneficial for typical usage patterns.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 1050 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 90.9%
🌀 Generated Regression Tests and Runtime
import asyncio  # used to run async functions
from typing import Any, Dict, Optional

import pytest  # used for our unit tests
from app.sources.external.confluence.confluence import ConfluenceDataSource

# --- Minimal stubs for required external classes and helpers ---

class HTTPResponse:
    """Stub for HTTPResponse."""
    def __init__(self, status_code: int = 200, content: Any = None):
        self.status_code = status_code
        self.content = content
    def __eq__(self, other):
        return (
            isinstance(other, HTTPResponse)
            and self.status_code == other.status_code
            and self.content == other.content
        )

class HTTPRequest:
    """Stub for HTTPRequest."""
    def __init__(self, method, url, headers, path_params, query_params, body):
        self.method = method
        self.url = url
        self.headers = headers
        self.path_params = path_params
        self.query_params = query_params
        self.body = body

# --- Mock clients for testing ---

class DummyAsyncClient:
    """Simulates the .execute() method for HTTP client."""
    def __init__(self, expected_status=200, expected_content=None, raise_on_execute: Optional[Exception]=None):
        self.expected_status = expected_status
        self.expected_content = expected_content
        self.raise_on_execute = raise_on_execute
        self.executed_requests = []
    async def execute(self, req):
        if self.raise_on_execute:
            raise self.raise_on_execute
        self.executed_requests.append(req)
        return HTTPResponse(status_code=self.expected_status, content=self.expected_content)
    def get_base_url(self):
        return "https://example.com"

class DummyConfluenceClient:
    """Simulates the ConfluenceClient.get_client() interface."""
    def __init__(self, async_client=None):
        self._client = async_client
    def get_client(self):
        return self._client
from app.sources.external.confluence.confluence import ConfluenceDataSource

# --- Unit tests for async disable_admin_key ---

# 1. Basic Test Cases

@pytest.mark.asyncio
async def test_disable_admin_key_basic_success():
    """Test that disable_admin_key returns an HTTPResponse with default status code 200."""
    dummy_client = DummyAsyncClient(expected_status=200, expected_content="success")
    confluence_client = DummyConfluenceClient(async_client=dummy_client)
    ds = ConfluenceDataSource(confluence_client)
    resp = await ds.disable_admin_key()

@pytest.mark.asyncio
async def test_disable_admin_key_with_headers():
    """Test that custom headers are passed and serialized correctly."""
    dummy_client = DummyAsyncClient()
    confluence_client = DummyConfluenceClient(async_client=dummy_client)
    ds = ConfluenceDataSource(confluence_client)
    headers = {"X-Test-Header": "value", "X-Num": 123}
    await ds.disable_admin_key(headers=headers)
    req = dummy_client.executed_requests[0]

@pytest.mark.asyncio
async def test_disable_admin_key_async_await_behavior():
    """Test that the function is a coroutine and can be awaited."""
    dummy_client = DummyAsyncClient()
    confluence_client = DummyConfluenceClient(async_client=dummy_client)
    ds = ConfluenceDataSource(confluence_client)
    codeflash_output = ds.disable_admin_key(); coro = codeflash_output
    resp = await coro

# 2. Edge Test Cases

@pytest.mark.asyncio
async def test_disable_admin_key_no_client_raises():
    """Test that ValueError is raised if client is not initialized."""
    class NullConfluenceClient:
        def get_client(self):
            return None
    with pytest.raises(ValueError, match="HTTP client is not initialized"):
        ConfluenceDataSource(NullConfluenceClient())

@pytest.mark.asyncio
async def test_disable_admin_key_client_missing_get_base_url():
    """Test that ValueError is raised if client lacks get_base_url method."""
    class BadClient:
        pass
    class BadConfluenceClient:
        def get_client(self):
            return BadClient()
    with pytest.raises(ValueError, match="HTTP client does not have get_base_url method"):
        ConfluenceDataSource(BadConfluenceClient())

@pytest.mark.asyncio
async def test_disable_admin_key_execute_raises():
    """Test that exceptions in execute are propagated."""
    dummy_client = DummyAsyncClient(raise_on_execute=RuntimeError("fail"))
    confluence_client = DummyConfluenceClient(async_client=dummy_client)
    ds = ConfluenceDataSource(confluence_client)
    with pytest.raises(RuntimeError, match="fail"):
        await ds.disable_admin_key()

@pytest.mark.asyncio
async def test_disable_admin_key_concurrent_execution():
    """Test concurrent calls to disable_admin_key (race conditions, etc)."""
    dummy_client = DummyAsyncClient()
    confluence_client = DummyConfluenceClient(async_client=dummy_client)
    ds = ConfluenceDataSource(confluence_client)
    # Run 10 concurrent disable_admin_key calls
    results = await asyncio.gather(*(ds.disable_admin_key() for _ in range(10)))

@pytest.mark.asyncio
async def test_disable_admin_key_headers_edge_cases():
    """Test edge cases for headers: empty dict, None, non-str values."""
    dummy_client = DummyAsyncClient()
    confluence_client = DummyConfluenceClient(async_client=dummy_client)
    ds = ConfluenceDataSource(confluence_client)
    # Pass None
    await ds.disable_admin_key(headers=None)
    # Pass empty dict
    await ds.disable_admin_key(headers={})
    # Pass non-str values
    await ds.disable_admin_key(headers={"int": 1, "bool": True, "none": None})
    req = dummy_client.executed_requests[-1]

# 3. Large Scale Test Cases

@pytest.mark.asyncio
async def test_disable_admin_key_large_scale_concurrent():
    """Test 100 concurrent calls to disable_admin_key."""
    dummy_client = DummyAsyncClient()
    confluence_client = DummyConfluenceClient(async_client=dummy_client)
    ds = ConfluenceDataSource(confluence_client)
    N = 100
    results = await asyncio.gather(*(ds.disable_admin_key() for _ in range(N)))

@pytest.mark.asyncio
async def test_disable_admin_key_large_scale_varied_headers():
    """Test concurrent calls with varied headers."""
    dummy_client = DummyAsyncClient()
    confluence_client = DummyConfluenceClient(async_client=dummy_client)
    ds = ConfluenceDataSource(confluence_client)
    headers_list = [{"X-Index": i} for i in range(20)]
    results = await asyncio.gather(*(ds.disable_admin_key(headers=h) for h in headers_list))
    # Check that each request got the right header
    for i, req in enumerate(dummy_client.executed_requests):
        pass

# 4. Throughput Test Cases

@pytest.mark.asyncio
async def test_disable_admin_key_throughput_small_load():
    """Throughput: Test with a small load (10 concurrent calls)."""
    dummy_client = DummyAsyncClient()
    confluence_client = DummyConfluenceClient(async_client=dummy_client)
    ds = ConfluenceDataSource(confluence_client)
    results = await asyncio.gather(*(ds.disable_admin_key() for _ in range(10)))

@pytest.mark.asyncio
async def test_disable_admin_key_throughput_medium_load():
    """Throughput: Test with a medium load (50 concurrent calls)."""
    dummy_client = DummyAsyncClient()
    confluence_client = DummyConfluenceClient(async_client=dummy_client)
    ds = ConfluenceDataSource(confluence_client)
    results = await asyncio.gather(*(ds.disable_admin_key() for _ in range(50)))

@pytest.mark.asyncio
async def test_disable_admin_key_throughput_large_load():
    """Throughput: Test with a large but bounded load (200 concurrent calls)."""
    dummy_client = DummyAsyncClient()
    confluence_client = DummyConfluenceClient(async_client=dummy_client)
    ds = ConfluenceDataSource(confluence_client)
    results = await asyncio.gather(*(ds.disable_admin_key() for _ in range(200)))

@pytest.mark.asyncio
async def test_disable_admin_key_throughput_sustained_pattern():
    """Throughput: Test sustained execution in bursts."""
    dummy_client = DummyAsyncClient()
    confluence_client = DummyConfluenceClient(async_client=dummy_client)
    ds = ConfluenceDataSource(confluence_client)
    # 5 bursts of 20 concurrent calls
    for _ in range(5):
        results = await asyncio.gather(*(ds.disable_admin_key() for _ in range(20)))
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
#------------------------------------------------
import asyncio  # used to run async functions

import pytest  # used for our unit tests
from app.sources.external.confluence.confluence import ConfluenceDataSource

# ---- Minimal stubs for dependencies ----
# These are minimal, fast, deterministic stubs for the classes and methods used by disable_admin_key.

class HTTPResponse:
    """Stub for HTTPResponse object."""
    def __init__(self, status_code=200, json_data=None, text_data=None):
        self.status_code = status_code
        self._json_data = json_data or {}
        self.text = text_data or ""
    def json(self):
        return self._json_data

class HTTPRequest:
    """Stub for HTTPRequest object."""
    def __init__(
        self,
        method,
        url,
        headers,
        path_params,
        query_params,
        body,
    ):
        self.method = method
        self.url = url
        self.headers = headers
        self.path_params = path_params
        self.query_params = query_params
        self.body = body

class DummyAsyncClient:
    """Stub for an async HTTP client."""
    def __init__(self, response=None, raise_exc=None):
        self._response = response or HTTPResponse()
        self._raise_exc = raise_exc
        self.request_calls = []

    async def request(self, method, url, **kwargs):
        self.request_calls.append((method, url, kwargs))
        if self._raise_exc:
            raise self._raise_exc
        return self._response

    async def aclose(self):
        pass

class DummyClient:
    """Stub for the _client used by ConfluenceDataSource."""
    def __init__(self, base_url="https://example.atlassian.net", response=None, raise_exc=None):
        self._base_url = base_url
        self._async_client = DummyAsyncClient(response=response, raise_exc=raise_exc)
        self._raise_exc = raise_exc

    def get_base_url(self):
        return self._base_url

    async def execute(self, request):
        if self._raise_exc:
            raise self._raise_exc
        # Simulate returning a response based on request
        # For testing, echo back method and url in response json
        return HTTPResponse(
            status_code=204 if request.method == "DELETE" else 200,
            json_data={"method": request.method, "url": request.url, "headers": request.headers}
        )

class ConfluenceClient:
    """Stub for ConfluenceClient wrapper."""
    def __init__(self, client):
        self.client = client
    def get_client(self):
        return self.client
from app.sources.external.confluence.confluence import ConfluenceDataSource

# ---- Unit Tests ----

# 1. Basic Test Cases

@pytest.mark.asyncio
async def test_disable_admin_key_basic_success():
    """Test basic async/await behavior and expected response."""
    client = ConfluenceClient(DummyClient())
    ds = ConfluenceDataSource(client)
    resp = await ds.disable_admin_key()
    data = resp.json()

@pytest.mark.asyncio
async def test_disable_admin_key_with_custom_headers():
    """Test passing custom headers and ensure they are sent."""
    client = ConfluenceClient(DummyClient())
    ds = ConfluenceDataSource(client)
    custom_headers = {"X-Admin": "true", "Authorization": "Bearer testtoken"}
    resp = await ds.disable_admin_key(headers=custom_headers)
    data = resp.json()

@pytest.mark.asyncio
async def test_disable_admin_key_empty_headers():
    """Test passing empty headers dict."""
    client = ConfluenceClient(DummyClient())
    ds = ConfluenceDataSource(client)
    resp = await ds.disable_admin_key(headers={})
    data = resp.json()

# 2. Edge Test Cases

@pytest.mark.asyncio
async def test_disable_admin_key_client_none_raises():
    """Test that ValueError is raised when client is None."""
    class NoneClient:
        def get_client(self):
            return None
    with pytest.raises(ValueError, match="HTTP client is not initialized"):
        ConfluenceDataSource(NoneClient())

@pytest.mark.asyncio
async def test_disable_admin_key_missing_get_base_url_raises():
    """Test that ValueError is raised if client lacks get_base_url."""
    class BadClient:
        pass
    cc = ConfluenceClient(BadClient())
    with pytest.raises(ValueError, match="HTTP client does not have get_base_url method"):
        ConfluenceDataSource(cc)

@pytest.mark.asyncio
async def test_disable_admin_key_execute_raises_exception():
    """Test that exceptions from execute propagate."""
    class FailingClient(DummyClient):
        async def execute(self, request):
            raise RuntimeError("Network error")
    client = ConfluenceClient(FailingClient())
    ds = ConfluenceDataSource(client)
    with pytest.raises(RuntimeError, match="Network error"):
        await ds.disable_admin_key()

@pytest.mark.asyncio
async def test_disable_admin_key_concurrent_execution():
    """Test concurrent execution of disable_admin_key."""
    client = ConfluenceClient(DummyClient())
    ds = ConfluenceDataSource(client)
    # Run 10 concurrent disables
    results = await asyncio.gather(*(ds.disable_admin_key() for _ in range(10)))
    for resp in results:
        pass

@pytest.mark.asyncio
async def test_disable_admin_key_headers_various_types():
    """Test headers with various types (int, bool, list, None)."""
    client = ConfluenceClient(DummyClient())
    ds = ConfluenceDataSource(client)
    headers = {
        "int": 42,
        "bool": True,
        "list": [1, False, "x"],
        "none": None,
    }
    resp = await ds.disable_admin_key(headers=headers)
    data = resp.json()

# 3. Large Scale Test Cases

@pytest.mark.asyncio
async def test_disable_admin_key_large_scale_concurrent():
    """Test large scale concurrent execution (100 disables)."""
    client = ConfluenceClient(DummyClient())
    ds = ConfluenceDataSource(client)
    n = 100
    results = await asyncio.gather(*(ds.disable_admin_key(headers={"X-Req": i}) for i in range(n)))
    for idx, resp in enumerate(results):
        pass

@pytest.mark.asyncio
async def test_disable_admin_key_large_scale_varied_headers():
    """Test large scale with varied header types."""
    client = ConfluenceClient(DummyClient())
    ds = ConfluenceDataSource(client)
    n = 50
    header_sets = [{"X-Num": i, "X-Flag": bool(i % 2)} for i in range(n)]
    results = await asyncio.gather(*(ds.disable_admin_key(headers=hdrs) for hdrs in header_sets))
    for i, resp in enumerate(results):
        hdr = resp.json()["headers"]

# 4. Throughput Test Cases

@pytest.mark.asyncio
async def test_disable_admin_key_throughput_small_load():
    """Throughput test: small load (10 disables)."""
    client = ConfluenceClient(DummyClient())
    ds = ConfluenceDataSource(client)
    results = await asyncio.gather(*(ds.disable_admin_key() for _ in range(10)))
    for resp in results:
        pass

@pytest.mark.asyncio
async def test_disable_admin_key_throughput_medium_load():
    """Throughput test: medium load (50 disables)."""
    client = ConfluenceClient(DummyClient())
    ds = ConfluenceDataSource(client)
    results = await asyncio.gather(*(ds.disable_admin_key() for _ in range(50)))
    for resp in results:
        pass

@pytest.mark.asyncio
async def test_disable_admin_key_throughput_high_volume():
    """Throughput test: high volume (200 disables)."""
    client = ConfluenceClient(DummyClient())
    ds = ConfluenceDataSource(client)
    results = await asyncio.gather(*(ds.disable_admin_key() for _ in range(200)))
    for resp in results:
        pass

@pytest.mark.asyncio
async def test_disable_admin_key_throughput_varied_headers():
    """Throughput test: high volume with varied headers."""
    client = ConfluenceClient(DummyClient())
    ds = ConfluenceDataSource(client)
    n = 100
    header_sets = [{"X-Index": i, "X-Even": i % 2 == 0} for i in range(n)]
    results = await asyncio.gather(*(ds.disable_admin_key(headers=hdrs) for hdrs in header_sets))
    for i, resp in enumerate(results):
        hdr = resp.json()["headers"]
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
#------------------------------------------------
from app.sources.external.confluence.confluence import ConfluenceDataSource

To edit these changes git checkout codeflash/optimize-ConfluenceDataSource.disable_admin_key-mhe736w6 and push.

Codeflash Static Badge

The optimized code achieves a **143% speedup** (2.62ms vs 6.37ms) and **16.5% throughput improvement** through several key optimizations that eliminate repeated computation overhead:

**Key Optimizations:**

1. **Module-Level Class Definition**: Moved `_SafeDict` class from inside `_safe_format_url` to module scope. The line profiler shows this eliminated 4.66ms of overhead (88.4% of `_safe_format_url` time) by avoiding class redefinition on every call.

2. **Cached Empty Dictionary**: Introduced `_EMPTY_STR_DICT` constant to avoid repeatedly calling `_as_str_dict()` on empty dictionaries. Since `_path` and `_query` are always empty, this eliminates ~5ms of unnecessary work per call.

3. **Optimized Header Processing**: Replaced `dict(headers or {})` with direct conditional assignment `headers if headers is not None else {}`, avoiding dict constructor overhead.

4. **Fast-Path for Empty Dictionaries**: Added early return in `_as_str_dict()` for empty inputs, reducing calls from 3,066 to 274 in the profiler results.

5. **Efficient String Concatenation**: Changed `self.base_url + _safe_format_url(...)` to f-string format for better performance.

**Performance Impact by Test Type:**
- **High-volume concurrent tests** (100-200 operations) see the biggest gains from reduced per-call overhead
- **Repeated calls with empty headers** benefit most from the cached empty dictionary optimization
- **All test scenarios** benefit from the eliminated class redefinition overhead

The optimizations are particularly effective for this API endpoint pattern where path/query parameters are consistently empty, making the caching strategy highly beneficial for typical usage patterns.
@codeflash-ai codeflash-ai bot requested a review from mashraf-222 October 31, 2025 01:49
@codeflash-ai codeflash-ai bot added ⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: Medium Optimization Quality according to Codeflash labels Oct 31, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: Medium Optimization Quality according to Codeflash

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant