55import logging
66import random
77from typing import Dict , List , Any , Optional
8+ import concurrent .futures
9+ import threading
810from openai import OpenAI , AzureOpenAI
911from optillm .plugins .proxy .routing import RouterFactory
1012from optillm .plugins .proxy .health import HealthChecker
@@ -34,13 +36,15 @@ def client(self):
3436 self ._client = AzureOpenAI (
3537 api_key = self .api_key ,
3638 azure_endpoint = self .base_url ,
37- api_version = "2024-02-01"
39+ api_version = "2024-02-01" ,
40+ max_retries = 0 # Disable client retries - we handle them
3841 )
3942 else :
4043 # Standard OpenAI-compatible client
4144 self ._client = OpenAI (
4245 api_key = self .api_key ,
43- base_url = self .base_url
46+ base_url = self .base_url ,
47+ max_retries = 0 # Disable client retries - we handle them
4448 )
4549 return self ._client
4650
@@ -97,6 +101,17 @@ def __init__(self, config: Dict, fallback_client=None):
97101 # Start health checking
98102 self .health_checker .start ()
99103
104+ # Timeout settings
105+ timeout_config = config .get ('timeouts' , {})
106+ self .request_timeout = timeout_config .get ('request' , 30 ) # Default 30 seconds
107+ self .connect_timeout = timeout_config .get ('connect' , 5 ) # Default 5 seconds
108+
109+ # Queue management settings
110+ queue_config = config .get ('queue' , {})
111+ self .max_concurrent_requests = queue_config .get ('max_concurrent' , 100 )
112+ self .queue_timeout = queue_config .get ('timeout' , 60 ) # Max time in queue
113+ self ._request_semaphore = threading .Semaphore (self .max_concurrent_requests )
114+
100115 # Monitoring settings
101116 monitoring = config .get ('monitoring' , {})
102117 self .track_latency = monitoring .get ('track_latency' , True )
@@ -123,74 +138,109 @@ def _filter_kwargs(self, kwargs: dict) -> dict:
123138 }
124139 return {k : v for k , v in kwargs .items () if k not in optillm_params }
125140
141+ def _make_request_with_timeout (self , provider , request_kwargs ):
142+ """Make a request with timeout handling"""
143+ # The OpenAI client now supports timeout natively
144+ try :
145+ response = provider .client .chat .completions .create (** request_kwargs )
146+ return response
147+ except Exception as e :
148+ # Check if it's a timeout error
149+ if "timeout" in str (e ).lower () or "timed out" in str (e ).lower ():
150+ raise TimeoutError (f"Request to { provider .name } timed out after { self .proxy_client .request_timeout } s" )
151+ raise e
152+
126153 def create (self , ** kwargs ):
127- """Create completion with load balancing and failover"""
128- model = kwargs .get ('model' , 'unknown' )
129- attempted_providers = set ()
130- errors = []
131-
132- # Get healthy providers
133- healthy_providers = [
134- p for p in self .proxy_client .active_providers
135- if p .is_healthy
136- ]
154+ """Create completion with load balancing, failover, and timeout handling"""
155+ # Check queue capacity
156+ if not self .proxy_client ._request_semaphore .acquire (blocking = True , timeout = self .proxy_client .queue_timeout ):
157+ raise TimeoutError (f"Request queue timeout after { self .proxy_client .queue_timeout } s - server overloaded" )
137158
138- if not healthy_providers :
139- logger .warning ("No healthy providers, trying fallback providers" )
140- healthy_providers = self .proxy_client .fallback_providers
141-
142- # Try routing through healthy providers
143- while healthy_providers :
144- available_providers = [p for p in healthy_providers if p not in attempted_providers ]
145- if not available_providers :
146- break
147-
148- provider = self .proxy_client .router .select (available_providers )
149- logger .info (f"Router selected provider: { provider .name if provider else 'None' } " )
159+ try :
160+ model = kwargs .get ('model' , 'unknown' )
161+ attempted_providers = set ()
162+ errors = []
150163
151- if not provider :
152- break
153-
154- attempted_providers .add (provider )
164+ # Get healthy providers
165+ healthy_providers = [
166+ p for p in self .proxy_client .active_providers
167+ if p .is_healthy
168+ ]
155169
156- try :
157- # Map model name if needed and filter out OptiLLM-specific parameters
158- request_kwargs = self ._filter_kwargs (kwargs .copy ())
159- request_kwargs ['model' ] = provider .map_model (model )
160-
161- # Track timing
162- start_time = time .time ()
163-
164- # Make request
165- logger .debug (f"Routing to { provider .name } " )
166- response = provider .client .chat .completions .create (** request_kwargs )
167-
168- # Track success
169- latency = time .time () - start_time
170- if self .proxy_client .track_latency :
171- provider .track_latency (latency )
172-
173- logger .info (f"Request succeeded via { provider .name } in { latency :.2f} s" )
174- return response
170+ if not healthy_providers :
171+ logger .warning ("No healthy providers, trying fallback providers" )
172+ healthy_providers = self .proxy_client .fallback_providers
173+
174+ # Try routing through healthy providers
175+ while healthy_providers :
176+ available_providers = [p for p in healthy_providers if p not in attempted_providers ]
177+ if not available_providers :
178+ break
179+
180+ provider = self .proxy_client .router .select (available_providers )
181+ logger .info (f"Router selected provider: { provider .name if provider else 'None' } " )
175182
176- except Exception as e :
177- logger .error (f"Provider { provider .name } failed: { e } " )
178- errors .append ((provider .name , str (e )))
183+ if not provider :
184+ break
185+
186+ attempted_providers .add (provider )
179187
180- # Mark provider as unhealthy
181- if self .proxy_client .track_errors :
182- provider .is_healthy = False
183- provider .last_error = str (e )
188+ try :
189+ # Map model name if needed and filter out OptiLLM-specific parameters
190+ request_kwargs = self ._filter_kwargs (kwargs .copy ())
191+ request_kwargs ['model' ] = provider .map_model (model )
192+
193+ # Add timeout to client if supported
194+ request_kwargs ['timeout' ] = self .proxy_client .request_timeout
195+
196+ # Track timing
197+ start_time = time .time ()
198+
199+ # Make request with timeout
200+ logger .debug (f"Routing to { provider .name } with { self .proxy_client .request_timeout } s timeout" )
201+ response = self ._make_request_with_timeout (provider , request_kwargs )
202+
203+ # Track success
204+ latency = time .time () - start_time
205+ if self .proxy_client .track_latency :
206+ provider .track_latency (latency )
207+
208+ logger .info (f"Request succeeded via { provider .name } in { latency :.2f} s" )
209+ return response
210+
211+ except TimeoutError as e :
212+ logger .error (f"Provider { provider .name } timed out: { e } " )
213+ errors .append ((provider .name , str (e )))
214+
215+ # Mark provider as unhealthy on timeout
216+ if self .proxy_client .track_errors :
217+ provider .is_healthy = False
218+ provider .last_error = f"Timeout: { str (e )} "
219+
220+ except Exception as e :
221+ logger .error (f"Provider { provider .name } failed: { e } " )
222+ errors .append ((provider .name , str (e )))
223+
224+ # Mark provider as unhealthy
225+ if self .proxy_client .track_errors :
226+ provider .is_healthy = False
227+ provider .last_error = str (e )
184228
185- # All providers failed, try fallback client
186- if self .proxy_client .fallback_client :
187- logger .warning ("All proxy providers failed, using fallback client" )
188- try :
189- return self .proxy_client .fallback_client .chat .completions .create (** self ._filter_kwargs (kwargs ))
190- except Exception as e :
191- errors .append (("fallback_client" , str (e )))
192-
193- # Complete failure
194- error_msg = f"All providers failed. Errors: { errors } "
195- logger .error (error_msg )
196- raise Exception (error_msg )
229+ # All providers failed, try fallback client
230+ if self .proxy_client .fallback_client :
231+ logger .warning ("All proxy providers failed, using fallback client" )
232+ try :
233+ fallback_kwargs = self ._filter_kwargs (kwargs )
234+ fallback_kwargs ['timeout' ] = self .proxy_client .request_timeout
235+ return self .proxy_client .fallback_client .chat .completions .create (** fallback_kwargs )
236+ except Exception as e :
237+ errors .append (("fallback_client" , str (e )))
238+
239+ # Complete failure
240+ error_msg = f"All providers failed. Errors: { errors } "
241+ logger .error (error_msg )
242+ raise Exception (error_msg )
243+
244+ finally :
245+ # Release semaphore to allow next request
246+ self .proxy_client ._request_semaphore .release ()
0 commit comments