Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/constants.py
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
"""Project-wide constants used across multiple modules."""

DEFAULT_COMMAND_PREFIX: str = "!/"
"""Default command prefix for interactive commands."""

MAX_RECENT_USAGE_RECORDS: int = 1000
"""Maximum number of recent usage records that can be requested at once."""
61 changes: 58 additions & 3 deletions src/core/app/controllers/usage_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from src.core.di.services import get_or_build_service_provider
from src.core.domain.usage_data import UsageData
from src.core.interfaces.usage_tracking_interface import IUsageTrackingService
from src.constants import MAX_RECENT_USAGE_RECORDS
from src.core.common.usage_limits import normalize_recent_usage_limit

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -62,8 +64,31 @@ async def get_recent_usage(
if not self.usage_service:
return []

try:
requested_limit = int(limit)
except (TypeError, ValueError):
requested_limit = 0

normalized_limit = normalize_recent_usage_limit(requested_limit)

if normalized_limit == 0:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
"Recent usage requested with limit=%s; returning empty result", limit
)
return []

if normalized_limit < requested_limit:
if logger.isEnabledFor(logging.INFO):
logger.info(
"Recent usage limit clamped from %s to %s (max=%s)",
limit,
normalized_limit,
MAX_RECENT_USAGE_RECORDS,
)

result = await self.usage_service.get_recent_usage(
session_id=session_id, limit=limit
session_id=session_id, limit=normalized_limit
)
return result # type: ignore[no-any-return]

Expand Down Expand Up @@ -92,7 +117,12 @@ async def get_usage_stats(
@router.get("/recent", response_model=list[UsageData])
async def get_recent_usage(
session_id: str | None = Query(None, description="Filter by session ID"),
limit: int = Query(100, description="Maximum number of records to return"),
limit: int = Query(
100,
description="Maximum number of records to return",
ge=0,
le=MAX_RECENT_USAGE_RECORDS,
),
service_provider: Any = Depends(get_or_build_service_provider),
) -> list[UsageData]:
"""Get recent usage data.
Expand All @@ -106,5 +136,30 @@ async def get_recent_usage(
List of usage data entities
"""
usage_service = service_provider.get_required_service(IUsageTrackingService)
result = await usage_service.get_recent_usage(session_id=session_id, limit=limit)
try:
requested_limit = int(limit)
except (TypeError, ValueError):
requested_limit = 0

normalized_limit = normalize_recent_usage_limit(requested_limit)

if normalized_limit == 0:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
"API recent usage requested with limit=%s; returning empty result", limit
)
return []

if normalized_limit < requested_limit:
if logger.isEnabledFor(logging.INFO):
logger.info(
"API recent usage limit clamped from %s to %s (max=%s)",
limit,
normalized_limit,
MAX_RECENT_USAGE_RECORDS,
)

result = await usage_service.get_recent_usage(
session_id=session_id, limit=normalized_limit
)
return result # type: ignore[no-any-return]
31 changes: 31 additions & 0 deletions src/core/common/usage_limits.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""Utilities for normalizing usage-related request parameters."""

from __future__ import annotations

from typing import Any

from src.constants import MAX_RECENT_USAGE_RECORDS


def normalize_recent_usage_limit(limit: Any) -> int:
"""Normalize the recent usage limit value to a safe, bounded integer.

Args:
limit: The requested limit value that may come from untrusted sources.

Returns:
A non-negative integer that does not exceed :data:`MAX_RECENT_USAGE_RECORDS`.
Invalid or non-positive values yield ``0`` so callers can short-circuit expensive
repository lookups.
"""

try:
numeric_limit = int(limit)
except (TypeError, ValueError):
return 0

if numeric_limit <= 0:
return 0

return min(numeric_limit, MAX_RECENT_USAGE_RECORDS)

30 changes: 29 additions & 1 deletion src/core/services/usage_tracking_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ def headers(self) -> dict[str, str]: ...
def media_type(self) -> str: ...


from src.constants import MAX_RECENT_USAGE_RECORDS
from src.core.common.usage_limits import normalize_recent_usage_limit
from src.core.domain.usage_data import UsageData
from src.core.domain.usage_stats import ModelUsageStats, UsageStatsResponse
from src.core.interfaces.repositories_interface import IUsageRepository
Expand Down Expand Up @@ -341,11 +343,37 @@ async def get_recent_usage(
Returns:
List of usage data entities
"""
try:
requested_limit = int(limit)
except (TypeError, ValueError):
requested_limit = 0

normalized_limit = normalize_recent_usage_limit(requested_limit)

if normalized_limit == 0:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
"Recent usage requested with limit=%s; returning empty result", limit
)
return []

if normalized_limit < requested_limit:
if logger.isEnabledFor(logging.INFO):
logger.info(
"Recent usage limit clamped from %s to %s (max=%s)",
limit,
normalized_limit,
MAX_RECENT_USAGE_RECORDS,
)

if session_id:
data = await self._repository.get_by_session_id(session_id)
else:
data = await self._repository.get_all()

# Sort by timestamp (newest first) and limit
if not data:
return []

sorted_data = sorted(data, key=lambda x: x.timestamp, reverse=True)
return sorted_data[:limit]
return sorted_data[:normalized_limit]
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from unittest.mock import AsyncMock

import pytest
from src.constants import MAX_RECENT_USAGE_RECORDS
from src.core.app.controllers.usage_controller import UsageController
from src.core.domain.usage_data import UsageData
from src.core.interfaces.usage_tracking_interface import IUsageTrackingService
Expand Down Expand Up @@ -228,23 +229,29 @@ async def test_get_recent_usage_large_limit(

assert result == mock_usage_data
mock_usage_service.get_recent_usage.assert_called_once_with(
session_id=None, limit=10000
session_id=None, limit=MAX_RECENT_USAGE_RECORDS
)

@pytest.mark.asyncio
async def test_get_recent_usage_zero_limit(
self, controller: UsageController, mock_usage_service: IUsageTrackingService
) -> None:
"""Test get_recent_usage with zero limit."""
mock_usage_data = []
mock_usage_service.get_recent_usage.return_value = mock_usage_data

result = await controller.get_recent_usage(limit=0)

assert result == mock_usage_data
mock_usage_service.get_recent_usage.assert_called_once_with(
session_id=None, limit=0
)
assert result == []
mock_usage_service.get_recent_usage.assert_not_called()

@pytest.mark.asyncio
async def test_get_recent_usage_negative_limit(
self, controller: UsageController, mock_usage_service: IUsageTrackingService
) -> None:
"""Test get_recent_usage with negative limit."""

result = await controller.get_recent_usage(limit=-5)

assert result == []
mock_usage_service.get_recent_usage.assert_not_called()
Comment on lines +245 to +254
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Duplicate test function name.

The test function test_get_recent_usage_negative_limit is defined twice (lines 245-254 and lines 389-397). Python will only keep the second definition, causing the first test to be silently ignored. This defeats the purpose of having comprehensive test coverage.

Rename one of these test functions to avoid the conflict. For example:

 @pytest.mark.asyncio
-async def test_get_recent_usage_negative_limit(
+async def test_get_recent_usage_negative_limit_returns_empty(
     self, controller: UsageController, mock_usage_service: IUsageTrackingService
 ) -> None:
-    """Test get_recent_usage with negative limit."""
+    """Test get_recent_usage with negative limit returns empty result."""

     result = await controller.get_recent_usage(limit=-5)

     assert result == []
     mock_usage_service.get_recent_usage.assert_not_called()
🤖 Prompt for AI Agents
In tests/unit/core/app/controllers/test_usage_controller_comprehensive.py around
lines 245-254, the test function name test_get_recent_usage_negative_limit is
duplicated later; rename this occurrence to a unique name (e.g.,
test_get_recent_usage_with_negative_limit_returns_empty) and update any
references or fixtures if needed so both tests coexist without name collision,
then run the test suite to verify both execute.


@pytest.mark.asyncio
async def test_service_error_handling_stats(
Expand Down Expand Up @@ -384,15 +391,10 @@ async def test_get_recent_usage_negative_limit(
self, controller: UsageController, mock_usage_service: IUsageTrackingService
) -> None:
"""Test get_recent_usage with negative limit value."""
mock_usage_data = []
mock_usage_service.get_recent_usage.return_value = mock_usage_data

result = await controller.get_recent_usage(limit=-10)

assert result == mock_usage_data
mock_usage_service.get_recent_usage.assert_called_once_with(
session_id=None, limit=-10
)
assert result == []
mock_usage_service.get_recent_usage.assert_not_called()

@pytest.mark.asyncio
async def test_get_usage_stats_zero_days(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from unittest.mock import AsyncMock, patch

import pytest
from src.constants import MAX_RECENT_USAGE_RECORDS
from src.core.domain.usage_data import UsageData
from src.core.interfaces.repositories_interface import IUsageRepository
from src.core.services.usage_tracking_service import UsageTrackingService
Expand Down Expand Up @@ -346,6 +347,28 @@ async def test_get_recent_usage(
assert result == mock_usage_data
mock_repository.get_by_session_id.assert_called_once_with("session1")

@pytest.mark.asyncio
async def test_get_recent_usage_zero_limit(
self, service: UsageTrackingService, mock_repository: IUsageRepository
) -> None:
"""Recent usage should return empty results for non-positive limits."""

result = await service.get_recent_usage(limit=0)

assert result == []
mock_repository.get_all.assert_not_called()

@pytest.mark.asyncio
async def test_get_recent_usage_negative_limit(
self, service: UsageTrackingService, mock_repository: IUsageRepository
) -> None:
"""Negative limits should be treated as zero to avoid large responses."""

result = await service.get_recent_usage(limit=-10)

assert result == []
mock_repository.get_all.assert_not_called()

@pytest.mark.asyncio
async def test_get_recent_usage_defaults(
self, service: UsageTrackingService, mock_repository: IUsageRepository
Expand All @@ -359,6 +382,32 @@ async def test_get_recent_usage_defaults(
assert result == mock_usage_data
mock_repository.get_all.assert_called_once()

@pytest.mark.asyncio
async def test_get_recent_usage_large_limit_is_clamped(
self, service: UsageTrackingService, mock_repository: IUsageRepository
) -> None:
"""Large limits should be clamped to protect against excessive workloads."""

mock_usage_data = [
UsageData(
id=str(index),
session_id="session",
model="model",
prompt_tokens=0,
completion_tokens=0,
total_tokens=0,
cost=0.0,
timestamp=datetime.now(timezone.utc) + timedelta(seconds=index),
)
for index in range(MAX_RECENT_USAGE_RECORDS + 50)
]
mock_repository.get_all.return_value = mock_usage_data

result = await service.get_recent_usage(limit=10_000)

assert len(result) == MAX_RECENT_USAGE_RECORDS
mock_repository.get_all.assert_called_once()

@pytest.mark.asyncio
async def test_get_recent_usage_with_session_id(
self, service: UsageTrackingService, mock_repository: IUsageRepository
Expand Down
Loading