1
1
import threading
2
- import socket
3
2
from typing import List , Any , Callable , Optional
4
3
5
4
from redis .background import BackgroundScheduler
6
- from redis .client import PubSubWorkerThread
7
- from redis .exceptions import ConnectionError , TimeoutError
8
5
from redis .commands import RedisModuleCommands , CoreCommands
9
6
from redis .multidb .command_executor import DefaultCommandExecutor
10
7
from redis .multidb .config import MultiDbConfig , DEFAULT_GRACE_PERIOD
11
- from redis .multidb .circuit import State as CBState , CircuitBreaker
12
- from redis .multidb .database import Database , AbstractDatabase , Databases
8
+ from redis .multidb .circuit import State as CBState , SyncCircuitBreaker
9
+ from redis .multidb .database import Database , Databases , SyncDatabase
13
10
from redis .multidb .exception import NoValidDatabaseException
14
11
from redis .multidb .failure_detector import FailureDetector
15
12
from redis .multidb .healthcheck import HealthCheck
@@ -92,7 +89,7 @@ def get_databases(self) -> Databases:
92
89
"""
93
90
return self ._databases
94
91
95
- def set_active_database (self , database : AbstractDatabase ) -> None :
92
+ def set_active_database (self , database : SyncDatabase ) -> None :
96
93
"""
97
94
Promote one of the existing databases to become an active.
98
95
"""
@@ -115,7 +112,7 @@ def set_active_database(self, database: AbstractDatabase) -> None:
115
112
116
113
raise NoValidDatabaseException ('Cannot set active database, database is unhealthy' )
117
114
118
- def add_database (self , database : AbstractDatabase ):
115
+ def add_database (self , database : SyncDatabase ):
119
116
"""
120
117
Adds a new database to the database list.
121
118
"""
@@ -129,7 +126,7 @@ def add_database(self, database: AbstractDatabase):
129
126
self ._databases .add (database , database .weight )
130
127
self ._change_active_database (database , highest_weighted_db )
131
128
132
- def _change_active_database (self , new_database : AbstractDatabase , highest_weight_database : AbstractDatabase ):
129
+ def _change_active_database (self , new_database : SyncDatabase , highest_weight_database : SyncDatabase ):
133
130
if new_database .weight > highest_weight_database .weight and new_database .circuit .state == CBState .CLOSED :
134
131
self .command_executor .active_database = new_database
135
132
@@ -143,7 +140,7 @@ def remove_database(self, database: Database):
143
140
if highest_weight <= weight and highest_weighted_db .circuit .state == CBState .CLOSED :
144
141
self .command_executor .active_database = highest_weighted_db
145
142
146
- def update_database_weight (self , database : AbstractDatabase , weight : float ):
143
+ def update_database_weight (self , database : SyncDatabase , weight : float ):
147
144
"""
148
145
Updates a database from the database list.
149
146
"""
@@ -210,7 +207,7 @@ def pubsub(self, **kwargs):
210
207
211
208
return PubSub (self , ** kwargs )
212
209
213
- def _check_db_health (self , database : AbstractDatabase , on_error : Callable [[Exception ], None ] = None ) -> None :
210
+ def _check_db_health (self , database : SyncDatabase , on_error : Callable [[Exception ], None ] = None ) -> None :
214
211
"""
215
212
Runs health checks on the given database until first failure.
216
213
"""
@@ -247,15 +244,15 @@ def _check_databases_health(self, on_error: Callable[[Exception], None] = None):
247
244
for database , _ in self ._databases :
248
245
self ._check_db_health (database , on_error )
249
246
250
- def _on_circuit_state_change_callback (self , circuit : CircuitBreaker , old_state : CBState , new_state : CBState ):
247
+ def _on_circuit_state_change_callback (self , circuit : SyncCircuitBreaker , old_state : CBState , new_state : CBState ):
251
248
if new_state == CBState .HALF_OPEN :
252
249
self ._check_db_health (circuit .database )
253
250
return
254
251
255
252
if old_state == CBState .CLOSED and new_state == CBState .OPEN :
256
253
self ._bg_scheduler .run_once (DEFAULT_GRACE_PERIOD , _half_open_circuit , circuit )
257
254
258
- def _half_open_circuit (circuit : CircuitBreaker ):
255
+ def _half_open_circuit (circuit : SyncCircuitBreaker ):
259
256
circuit .state = CBState .HALF_OPEN
260
257
261
258
@@ -450,8 +447,8 @@ def run_in_thread(
450
447
exception_handler : Optional [Callable ] = None ,
451
448
sharded_pubsub : bool = False ,
452
449
) -> "PubSubWorkerThread" :
453
- return self ._client .command_executor .execute_pubsub_run_in_thread (
454
- sleep_time = sleep_time ,
450
+ return self ._client .command_executor .execute_pubsub_run (
451
+ sleep_time ,
455
452
daemon = daemon ,
456
453
exception_handler = exception_handler ,
457
454
pubsub = self ,
0 commit comments