3535 SlidingExpirationCacheWithCleanupThread
3636from aws_advanced_python_wrapper .utils .telemetry .telemetry import (
3737 TelemetryContext , TelemetryFactory , TelemetryTraceLevel )
38+ from aws_advanced_python_wrapper .utils .utils import LogUtils
3839
3940if TYPE_CHECKING :
4041 from aws_advanced_python_wrapper .driver_dialect import DriverDialect
@@ -50,6 +51,10 @@ class LimitlessConnectionPlugin(Plugin):
5051 def __init__ (self , plugin_service : PluginService , props : Properties ):
5152 self ._plugin_service = plugin_service
5253 self ._properties = props
54+ self ._limitless_router_service = LimitlessRouterService (
55+ self ._plugin_service ,
56+ LimitlessQueryHelper (self ._plugin_service )
57+ )
5358
5459 @property
5560 def subscribed_methods (self ) -> Set [str ]:
@@ -68,34 +73,27 @@ def connect(
6873
6974 dialect : DatabaseDialect = self ._plugin_service .database_dialect
7075 if not isinstance (dialect , AuroraLimitlessDialect ):
71- connection = connect_func ()
7276 refreshed_dialect = self ._plugin_service .database_dialect
73-
7477 if not isinstance (refreshed_dialect , AuroraLimitlessDialect ):
7578 raise UnsupportedOperationError (
7679 Messages .get_formatted ("LimitlessConnectionPlugin.UnsupportedDialectOrDatabase" ,
7780 type (refreshed_dialect ).__name__ ))
7881
79- limitless_router_service = LimitlessRouterService (
80- self ._plugin_service ,
81- LimitlessQueryHelper (self ._plugin_service )
82- )
83-
8482 if is_initial_connection :
85- limitless_router_service .start_monitoring (host_info , props )
83+ self . _limitless_router_service .start_monitoring (host_info , props )
8684
87- context : LimitlessConnectionContext = LimitlessConnectionContext (
85+ self . _context : LimitlessConnectionContext = LimitlessConnectionContext (
8886 host_info ,
8987 props ,
9088 connection ,
9189 connect_func ,
9290 [],
9391 self
9492 )
95- limitless_router_service . establish_connection (context )
96- connection = context .get_connection ()
93+ self . _limitless_router_service . establish_connection (self . _context )
94+ connection = self . _context .get_connection ()
9795 if connection is not None and not self ._plugin_service .driver_dialect .is_closed (connection ):
98- return context . get_connection ()
96+ return connection
9997
10098 raise AwsWrapperError (Messages .get_formatted ("LimitlessConnectionPlugin.FailedToConnectToHost" , host_info .host ))
10199
@@ -172,6 +170,7 @@ def run(self):
172170 lambda _ : new_limitless_routers ,
173171 WrapperProperties .LIMITLESS_MONITOR_DISPOSAL_TIME_MS .get (
174172 self ._properties ) * 1_000_000 )
173+ logger .debug (LogUtils .log_topology (tuple (new_limitless_routers ), "[limitlessRouterMonitor] Topology:" ))
175174
176175 sleep (self ._interval_ms / 1000 )
177176
@@ -331,18 +330,19 @@ def establish_connection(self, context: LimitlessConnectionContext) -> None:
331330 self ._plugin_service .host_list_provider .get_cluster_id (), context .get_props ()))
332331
333332 if context .get_limitless_routers () is None or len (context .get_limitless_routers ()) == 0 :
334- logger .debug ("LimitlessRouterServiceImpl.limitlessRouterCacheEmpty " )
333+ logger .debug ("LimitlessRouterService.:LimitlessRouterCacheEmpty " )
335334
336335 wait_for_router_info = WrapperProperties .WAIT_FOR_ROUTER_INFO .get (context .get_props ())
337336 if wait_for_router_info :
338337 self ._synchronously_get_limitless_routers_with_retry (context )
339338 else :
340- logger .debug ("LimitlessRouterServiceImpl .UsingProvidedConnectUrl" )
339+ logger .debug ("LimitlessRouterService .UsingProvidedConnectUrl" )
341340 if context .get_connection () is None or self ._plugin_service .driver_dialect .is_closed (context .get_connection ()):
342341 context .set_connection (context .get_connect_func ()())
342+ return
343343
344- if context .get_host_info in context .get_limitless_routers ():
345- logger .debug ("LimitlessRouterServiceImpl .ConnectWithHost" )
344+ if context .get_host_info () in context .get_limitless_routers ():
345+ logger .debug ("LimitlessRouterService .ConnectWithHost" )
346346 if context .get_connection () is None :
347347 try :
348348 context .set_connection (context .get_connect_func ()())
@@ -356,7 +356,7 @@ def establish_connection(self, context: LimitlessConnectionContext) -> None:
356356 try :
357357 selected_host_info = self ._plugin_service .get_host_info_by_strategy (
358358 HostRole .WRITER , "weighted_random" , context .get_limitless_routers ())
359- logger .debug ("LimitlessRouterServiceImpl .SelectedHost" , "None" if selected_host_info is None else selected_host_info .host )
359+ logger .debug ("LimitlessRouterService .SelectedHost" , "None" if selected_host_info is None else selected_host_info .host )
360360 except Exception as e :
361361 if self ._is_login_exception (e ) or isinstance (e , UnsupportedOperationError ):
362362 raise e
@@ -375,7 +375,7 @@ def establish_connection(self, context: LimitlessConnectionContext) -> None:
375375 raise e
376376
377377 if selected_host_info is not None :
378- logger .debug ("LimitlessRouterServiceImpl .FailedToConnectToHost" , selected_host_info .host )
378+ logger .debug ("LimitlessRouterService .FailedToConnectToHost" , selected_host_info .host )
379379 selected_host_info .set_availability (HostAvailability .UNAVAILABLE )
380380
381381 self ._retry_connection_with_least_loaded_routers (context )
@@ -400,7 +400,7 @@ def _retry_connection_with_least_loaded_routers(self, context: LimitlessConnecti
400400 if (context .get_limitless_routers () is None
401401 or len (context .get_limitless_routers ()) == 0
402402 or not context .is_any_router_available ()):
403- logger .debug ("LimitlessRouterServiceImpl .NoRoutersAvailableForRetry" )
403+ logger .debug ("LimitlessRouterService .NoRoutersAvailableForRetry" )
404404
405405 if context .get_connection () is not None and not self ._plugin_service .driver_dialect .is_closed (context .get_connection ()):
406406 return
@@ -417,14 +417,14 @@ def _retry_connection_with_least_loaded_routers(self, context: LimitlessConnecti
417417
418418 try :
419419 selected_host_info = self ._plugin_service .get_host_info_by_strategy (
420- HostRole .WRITER , "weighted_random " , context .get_limitless_routers ())
421- logger .debug ("LimitlessRouterServiceImpl .SelectedHostForRetry" ,
420+ HostRole .WRITER , "highest_weight " , context .get_limitless_routers ())
421+ logger .debug ("LimitlessRouterService .SelectedHostForRetry" ,
422422 "None" if selected_host_info is None else selected_host_info .host )
423423 if selected_host_info is None :
424424 continue
425425
426426 except UnsupportedOperationError as e :
427- logger .error ("LimitlessRouterServiceImpl .IncorrectConfiguration" )
427+ logger .error ("LimitlessRouterService .IncorrectConfiguration" )
428428 raise e
429429 except AwsWrapperError :
430430 continue
@@ -438,14 +438,14 @@ def _retry_connection_with_least_loaded_routers(self, context: LimitlessConnecti
438438 if self ._is_login_exception (e ):
439439 raise e
440440 selected_host_info .set_availability (HostAvailability .UNAVAILABLE )
441- logger .debug ("LimitlessRouterServiceImpl .FailedToConnectToHost" , selected_host_info .host )
441+ logger .debug ("LimitlessRouterService .FailedToConnectToHost" , selected_host_info .host )
442442
443443 raise AwsWrapperError (Messages .get ("LimitlessRouterService.MaxRetriesExceeded" ))
444444
445445 def _synchronously_get_limitless_routers_with_retry (self , context : LimitlessConnectionContext ) -> None :
446- logger .debug ("LimitlessRouterServiceImpl .SynchronouslyGetLimitlessRouters" )
446+ logger .debug ("LimitlessRouterService .SynchronouslyGetLimitlessRouters" )
447447 retry_count = - 1
448- max_retries = WrapperProperties .MAX_RETRIES_MS .get_int (context .get_props ())
448+ max_retries = WrapperProperties .GET_ROUTER_MAX_RETRIES .get_int (context .get_props ())
449449 retry_interval_ms = WrapperProperties .GET_ROUTER_RETRY_INTERVAL_MS .get_float (context .get_props ())
450450 first_iteration = True
451451 while first_iteration or retry_count < max_retries :
0 commit comments