From 595fe04f75feca62b1299f31442973a811166f91 Mon Sep 17 00:00:00 2001 From: Asankhaya Sharma Date: Tue, 9 Sep 2025 09:21:14 +0800 Subject: [PATCH 1/2] fixes --- optillm/plugins/proxy/README.md | 28 +++++ optillm/plugins/proxy/client.py | 182 ++++++++++++++++++++------------ optillm/plugins/proxy/config.py | 28 +++++ tests/test_plugins.py | 103 ++++++++++++++++++ 4 files changed, 275 insertions(+), 66 deletions(-) diff --git a/optillm/plugins/proxy/README.md b/optillm/plugins/proxy/README.md index 05a55867..2e88b8af 100644 --- a/optillm/plugins/proxy/README.md +++ b/optillm/plugins/proxy/README.md @@ -43,6 +43,14 @@ providers: routing: strategy: weighted # Options: weighted, round_robin, failover + +timeouts: + request: 30 # Maximum seconds to wait for a provider response + connect: 5 # Maximum seconds to wait for connection + +queue: + max_concurrent: 100 # Maximum concurrent requests to prevent overload + timeout: 60 # Maximum seconds a request can wait in queue ``` ### 2. Start OptiLLM Server @@ -161,6 +169,26 @@ routing: timeout: 5 # Timeout for health check requests ``` +### Timeout and Queue Management + +Prevent request queue backup and handle slow/unresponsive backends: + +```yaml +timeouts: + request: 30 # Maximum seconds to wait for provider response (default: 30) + connect: 5 # Maximum seconds for initial connection (default: 5) + +queue: + max_concurrent: 100 # Maximum concurrent requests (default: 100) + timeout: 60 # Maximum seconds in queue before rejection (default: 60) +``` + +**How it works:** +- **Request Timeout**: Each request to a provider has a maximum time limit. If exceeded, the request is cancelled and the next provider is tried. +- **Queue Management**: Limits concurrent requests to prevent memory exhaustion. New requests wait up to `queue.timeout` seconds before being rejected. +- **Automatic Failover**: When a provider times out, it's marked unhealthy and the request automatically fails over to the next available provider. +- **Protection**: Prevents slow backends from causing queue buildup that can crash the proxy server. + ### Environment Variables The configuration supports flexible environment variable interpolation: diff --git a/optillm/plugins/proxy/client.py b/optillm/plugins/proxy/client.py index 2b36d18c..1b85972b 100644 --- a/optillm/plugins/proxy/client.py +++ b/optillm/plugins/proxy/client.py @@ -5,6 +5,8 @@ import logging import random from typing import Dict, List, Any, Optional +import concurrent.futures +import threading from openai import OpenAI, AzureOpenAI from optillm.plugins.proxy.routing import RouterFactory from optillm.plugins.proxy.health import HealthChecker @@ -34,13 +36,15 @@ def client(self): self._client = AzureOpenAI( api_key=self.api_key, azure_endpoint=self.base_url, - api_version="2024-02-01" + api_version="2024-02-01", + max_retries=0 # Disable client retries - we handle them ) else: # Standard OpenAI-compatible client self._client = OpenAI( api_key=self.api_key, - base_url=self.base_url + base_url=self.base_url, + max_retries=0 # Disable client retries - we handle them ) return self._client @@ -97,6 +101,17 @@ def __init__(self, config: Dict, fallback_client=None): # Start health checking self.health_checker.start() + # Timeout settings + timeout_config = config.get('timeouts', {}) + self.request_timeout = timeout_config.get('request', 30) # Default 30 seconds + self.connect_timeout = timeout_config.get('connect', 5) # Default 5 seconds + + # Queue management settings + queue_config = config.get('queue', {}) + self.max_concurrent_requests = queue_config.get('max_concurrent', 100) + self.queue_timeout = queue_config.get('timeout', 60) # Max time in queue + self._request_semaphore = threading.Semaphore(self.max_concurrent_requests) + # Monitoring settings monitoring = config.get('monitoring', {}) self.track_latency = monitoring.get('track_latency', True) @@ -123,74 +138,109 @@ def _filter_kwargs(self, kwargs: dict) -> dict: } return {k: v for k, v in kwargs.items() if k not in optillm_params} + def _make_request_with_timeout(self, provider, request_kwargs): + """Make a request with timeout handling""" + # The OpenAI client now supports timeout natively + try: + response = provider.client.chat.completions.create(**request_kwargs) + return response + except Exception as e: + # Check if it's a timeout error + if "timeout" in str(e).lower() or "timed out" in str(e).lower(): + raise TimeoutError(f"Request to {provider.name} timed out after {self.proxy_client.request_timeout}s") + raise e + def create(self, **kwargs): - """Create completion with load balancing and failover""" - model = kwargs.get('model', 'unknown') - attempted_providers = set() - errors = [] - - # Get healthy providers - healthy_providers = [ - p for p in self.proxy_client.active_providers - if p.is_healthy - ] + """Create completion with load balancing, failover, and timeout handling""" + # Check queue capacity + if not self.proxy_client._request_semaphore.acquire(blocking=True, timeout=self.proxy_client.queue_timeout): + raise TimeoutError(f"Request queue timeout after {self.proxy_client.queue_timeout}s - server overloaded") - if not healthy_providers: - logger.warning("No healthy providers, trying fallback providers") - healthy_providers = self.proxy_client.fallback_providers - - # Try routing through healthy providers - while healthy_providers: - available_providers = [p for p in healthy_providers if p not in attempted_providers] - if not available_providers: - break - - provider = self.proxy_client.router.select(available_providers) - logger.info(f"Router selected provider: {provider.name if provider else 'None'}") + try: + model = kwargs.get('model', 'unknown') + attempted_providers = set() + errors = [] - if not provider: - break - - attempted_providers.add(provider) + # Get healthy providers + healthy_providers = [ + p for p in self.proxy_client.active_providers + if p.is_healthy + ] - try: - # Map model name if needed and filter out OptiLLM-specific parameters - request_kwargs = self._filter_kwargs(kwargs.copy()) - request_kwargs['model'] = provider.map_model(model) - - # Track timing - start_time = time.time() - - # Make request - logger.debug(f"Routing to {provider.name}") - response = provider.client.chat.completions.create(**request_kwargs) - - # Track success - latency = time.time() - start_time - if self.proxy_client.track_latency: - provider.track_latency(latency) - - logger.info(f"Request succeeded via {provider.name} in {latency:.2f}s") - return response + if not healthy_providers: + logger.warning("No healthy providers, trying fallback providers") + healthy_providers = self.proxy_client.fallback_providers + + # Try routing through healthy providers + while healthy_providers: + available_providers = [p for p in healthy_providers if p not in attempted_providers] + if not available_providers: + break + + provider = self.proxy_client.router.select(available_providers) + logger.info(f"Router selected provider: {provider.name if provider else 'None'}") - except Exception as e: - logger.error(f"Provider {provider.name} failed: {e}") - errors.append((provider.name, str(e))) + if not provider: + break + + attempted_providers.add(provider) - # Mark provider as unhealthy - if self.proxy_client.track_errors: - provider.is_healthy = False - provider.last_error = str(e) + try: + # Map model name if needed and filter out OptiLLM-specific parameters + request_kwargs = self._filter_kwargs(kwargs.copy()) + request_kwargs['model'] = provider.map_model(model) + + # Add timeout to client if supported + request_kwargs['timeout'] = self.proxy_client.request_timeout + + # Track timing + start_time = time.time() + + # Make request with timeout + logger.debug(f"Routing to {provider.name} with {self.proxy_client.request_timeout}s timeout") + response = self._make_request_with_timeout(provider, request_kwargs) + + # Track success + latency = time.time() - start_time + if self.proxy_client.track_latency: + provider.track_latency(latency) + + logger.info(f"Request succeeded via {provider.name} in {latency:.2f}s") + return response + + except TimeoutError as e: + logger.error(f"Provider {provider.name} timed out: {e}") + errors.append((provider.name, str(e))) + + # Mark provider as unhealthy on timeout + if self.proxy_client.track_errors: + provider.is_healthy = False + provider.last_error = f"Timeout: {str(e)}" + + except Exception as e: + logger.error(f"Provider {provider.name} failed: {e}") + errors.append((provider.name, str(e))) + + # Mark provider as unhealthy + if self.proxy_client.track_errors: + provider.is_healthy = False + provider.last_error = str(e) - # All providers failed, try fallback client - if self.proxy_client.fallback_client: - logger.warning("All proxy providers failed, using fallback client") - try: - return self.proxy_client.fallback_client.chat.completions.create(**self._filter_kwargs(kwargs)) - except Exception as e: - errors.append(("fallback_client", str(e))) - - # Complete failure - error_msg = f"All providers failed. Errors: {errors}" - logger.error(error_msg) - raise Exception(error_msg) \ No newline at end of file + # All providers failed, try fallback client + if self.proxy_client.fallback_client: + logger.warning("All proxy providers failed, using fallback client") + try: + fallback_kwargs = self._filter_kwargs(kwargs) + fallback_kwargs['timeout'] = self.proxy_client.request_timeout + return self.proxy_client.fallback_client.chat.completions.create(**fallback_kwargs) + except Exception as e: + errors.append(("fallback_client", str(e))) + + # Complete failure + error_msg = f"All providers failed. Errors: {errors}" + logger.error(error_msg) + raise Exception(error_msg) + + finally: + # Release semaphore to allow next request + self.proxy_client._request_semaphore.release() \ No newline at end of file diff --git a/optillm/plugins/proxy/config.py b/optillm/plugins/proxy/config.py index 38031f3e..e5f0c684 100644 --- a/optillm/plugins/proxy/config.py +++ b/optillm/plugins/proxy/config.py @@ -137,6 +137,8 @@ def _apply_defaults(config: Dict) -> Dict: config.setdefault('providers', []) config.setdefault('routing', {}) config.setdefault('monitoring', {}) + config.setdefault('timeouts', {}) + config.setdefault('queue', {}) # Routing defaults routing = config['routing'] @@ -154,6 +156,16 @@ def _apply_defaults(config: Dict) -> Dict: monitoring.setdefault('track_latency', True) monitoring.setdefault('track_errors', True) + # Timeout defaults + timeouts = config['timeouts'] + timeouts.setdefault('request', 30) # 30 seconds for requests + timeouts.setdefault('connect', 5) # 5 seconds for connection + + # Queue management defaults + queue = config['queue'] + queue.setdefault('max_concurrent', 100) # Max concurrent requests + queue.setdefault('timeout', 60) # Max time waiting in queue + # Provider defaults for i, provider in enumerate(config['providers']): provider.setdefault('name', f"provider_{i}") @@ -224,6 +236,14 @@ def _create_default(path: Path): interval: 30 # seconds timeout: 5 # seconds +timeouts: + request: 30 # Maximum time for a request (seconds) + connect: 5 # Maximum time for connection (seconds) + +queue: + max_concurrent: 100 # Maximum concurrent requests + timeout: 60 # Maximum time in queue (seconds) + monitoring: log_level: INFO track_latency: true @@ -244,6 +264,14 @@ def _get_minimal_config() -> Dict: 'strategy': 'round_robin', 'health_check': {'enabled': False} }, + 'timeouts': { + 'request': 30, + 'connect': 5 + }, + 'queue': { + 'max_concurrent': 100, + 'timeout': 60 + }, 'monitoring': { 'log_level': 'INFO', 'track_latency': False, diff --git a/tests/test_plugins.py b/tests/test_plugins.py index 23e3c3d9..2370ee29 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -199,6 +199,97 @@ def test_proxy_plugin_token_counts(): assert result['usage']['total_tokens'] == 15 +def test_proxy_plugin_timeout_config(): + """Test that proxy plugin properly configures timeout settings""" + from optillm.plugins.proxy.config import ProxyConfig + import tempfile + import yaml + + # Create test config with timeout settings + config = { + "providers": [ + { + "name": "test_provider", + "base_url": "http://localhost:8000/v1", + "api_key": "test-key" + } + ], + "timeouts": { + "request": 10, + "connect": 3 + }, + "queue": { + "max_concurrent": 50, + "timeout": 30 + } + } + + # Write config to temp file + with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: + yaml.dump(config, f) + config_path = f.name + + try: + # Load config and verify timeout settings + loaded_config = ProxyConfig.load(config_path) + + assert 'timeouts' in loaded_config, "Config should contain timeouts section" + assert loaded_config['timeouts']['request'] == 10, "Request timeout should be 10" + assert loaded_config['timeouts']['connect'] == 3, "Connect timeout should be 3" + + assert 'queue' in loaded_config, "Config should contain queue section" + assert loaded_config['queue']['max_concurrent'] == 50, "Max concurrent should be 50" + assert loaded_config['queue']['timeout'] == 30, "Queue timeout should be 30" + + finally: + import os + os.unlink(config_path) + + +def test_proxy_plugin_timeout_handling(): + """Test that proxy plugin handles timeouts correctly""" + from optillm.plugins.proxy.client import ProxyClient + from unittest.mock import Mock, patch + import concurrent.futures + + # Create config with short timeout + config = { + "providers": [ + { + "name": "slow_provider", + "base_url": "http://localhost:8001/v1", + "api_key": "test-key-1" + }, + { + "name": "fast_provider", + "base_url": "http://localhost:8002/v1", + "api_key": "test-key-2" + } + ], + "routing": { + "strategy": "round_robin", + "health_check": {"enabled": False} + }, + "timeouts": { + "request": 2, + "connect": 1 + }, + "queue": { + "max_concurrent": 10, + "timeout": 5 + } + } + + # Create proxy client + proxy_client = ProxyClient(config) + + # Verify timeout settings are loaded + assert proxy_client.request_timeout == 2, "Request timeout should be 2" + assert proxy_client.connect_timeout == 1, "Connect timeout should be 1" + assert proxy_client.max_concurrent_requests == 10, "Max concurrent should be 10" + assert proxy_client.queue_timeout == 5, "Queue timeout should be 5" + + def test_plugin_subdirectory_imports(): """Test all plugins with subdirectories can import their submodules""" # Test deep_research @@ -333,6 +424,18 @@ def test_no_relative_import_errors(): except Exception as e: print(f"❌ Proxy plugin token counts test failed: {e}") + try: + test_proxy_plugin_timeout_config() + print("✅ Proxy plugin timeout config test passed") + except Exception as e: + print(f"❌ Proxy plugin timeout config test failed: {e}") + + try: + test_proxy_plugin_timeout_handling() + print("✅ Proxy plugin timeout handling test passed") + except Exception as e: + print(f"❌ Proxy plugin timeout handling test failed: {e}") + try: test_plugin_subdirectory_imports() print("✅ Plugin subdirectory imports test passed") From e5a0dcc10548bc9ea9ca75dc137f38dbf5427dae Mon Sep 17 00:00:00 2001 From: Asankhaya Sharma Date: Tue, 9 Sep 2025 09:24:38 +0800 Subject: [PATCH 2/2] bump versions --- optillm/__init__.py | 2 +- pyproject.toml | 2 +- tests/test_plugins.py | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/optillm/__init__.py b/optillm/__init__.py index 2b769969..cc9aea13 100644 --- a/optillm/__init__.py +++ b/optillm/__init__.py @@ -1,5 +1,5 @@ # Version information -__version__ = "0.2.5" +__version__ = "0.2.6" # Import from server module from .server import ( diff --git a/pyproject.toml b/pyproject.toml index bfb46d29..4bce6bb9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "optillm" -version = "0.2.5" +version = "0.2.6" description = "An optimizing inference proxy for LLMs." readme = "README.md" license = "Apache-2.0" diff --git a/tests/test_plugins.py b/tests/test_plugins.py index 2370ee29..5f91fe1d 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -234,8 +234,8 @@ def test_proxy_plugin_timeout_config(): loaded_config = ProxyConfig.load(config_path) assert 'timeouts' in loaded_config, "Config should contain timeouts section" - assert loaded_config['timeouts']['request'] == 10, "Request timeout should be 10" - assert loaded_config['timeouts']['connect'] == 3, "Connect timeout should be 3" + assert loaded_config['timeouts'].get('request') == 10, "Request timeout should be 10" + assert loaded_config['timeouts'].get('connect') == 3, "Connect timeout should be 3" assert 'queue' in loaded_config, "Config should contain queue section" assert loaded_config['queue']['max_concurrent'] == 50, "Max concurrent should be 50"