@@ -27,6 +27,14 @@ def __init__(self, config: Dict):
2727 self .last_error = None
2828 self .latencies = [] # Track recent latencies
2929
30+ # Per-provider concurrency control
31+ self .max_concurrent = config .get ('max_concurrent' , None ) # None means no limit
32+ if self .max_concurrent is not None :
33+ self ._semaphore = threading .Semaphore (self .max_concurrent )
34+ logger .info (f"Provider { self .name } limited to { self .max_concurrent } concurrent requests" )
35+ else :
36+ self ._semaphore = None
37+
3038 @property
3139 def client (self ):
3240 """Lazy initialization of OpenAI client"""
@@ -39,6 +47,13 @@ def client(self):
3947 api_version = "2024-02-01" ,
4048 max_retries = 0 # Disable client retries - we handle them
4149 )
50+ elif 'generativelanguage.googleapis.com' in self .base_url :
51+ # Google AI client - create custom client to avoid "models/" prefix
52+ from optillm .plugins .proxy .google_client import GoogleAIClient
53+ self ._client = GoogleAIClient (
54+ api_key = self .api_key ,
55+ base_url = self .base_url
56+ )
4257 else :
4358 # Standard OpenAI-compatible client
4459 self ._client = OpenAI (
@@ -63,6 +78,28 @@ def avg_latency(self) -> float:
6378 if not self .latencies :
6479 return 0
6580 return sum (self .latencies ) / len (self .latencies )
81+
82+ def acquire_slot (self , timeout : Optional [float ] = None ) -> bool :
83+ """
84+ Try to acquire a slot for this provider.
85+ Returns True if acquired, False if timeout or no limit.
86+ """
87+ if self ._semaphore is None :
88+ return True # No limit, always available
89+
90+ return self ._semaphore .acquire (blocking = True , timeout = timeout )
91+
92+ def release_slot (self ):
93+ """Release a slot for this provider."""
94+ if self ._semaphore is not None :
95+ self ._semaphore .release ()
96+
97+ def available_slots (self ) -> Optional [int ]:
98+ """Get number of available slots, None if unlimited."""
99+ if self ._semaphore is None :
100+ return None
101+ # Note: _value is internal but there's no public method to check availability
102+ return self ._semaphore ._value
66103
67104class ProxyClient :
68105 """OpenAI-compatible client that proxies to multiple providers"""
@@ -185,6 +222,13 @@ def create(self, **kwargs):
185222
186223 attempted_providers .add (provider )
187224
225+ # Try to acquire a slot for this provider (with reasonable timeout for queueing)
226+ slot_timeout = 10.0 # Wait up to 10 seconds for provider to become available
227+ if not provider .acquire_slot (timeout = slot_timeout ):
228+ logger .debug (f"Provider { provider .name } at max capacity, trying next provider" )
229+ errors .append ((provider .name , "At max concurrent requests" ))
230+ continue
231+
188232 try :
189233 # Map model name if needed and filter out OptiLLM-specific parameters
190234 request_kwargs = self ._filter_kwargs (kwargs .copy ())
@@ -225,6 +269,11 @@ def create(self, **kwargs):
225269 if self .proxy_client .track_errors :
226270 provider .is_healthy = False
227271 provider .last_error = str (e )
272+
273+ finally :
274+ # Always release the provider slot
275+ provider .release_slot ()
276+ logger .debug (f"Released slot for provider { provider .name } " )
228277
229278 # All providers failed, try fallback client
230279 if self .proxy_client .fallback_client :
0 commit comments