Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/neo4j/_async/io/_bolt3.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,9 @@ async def _process_message(self, tag, fields):
raise
except (NotALeader, ForbiddenOnReadOnlyDatabase):
if self.pool:
self.pool.on_write_failure(address=self.unresolved_address)
await self.pool.on_write_failure(
address=self.unresolved_address
)
raise
except Neo4jError as e:
await self.pool.on_neo4j_error(e, self)
Expand Down
4 changes: 3 additions & 1 deletion src/neo4j/_async/io/_bolt4.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,9 @@ async def _process_message(self, tag, fields):
raise
except (NotALeader, ForbiddenOnReadOnlyDatabase):
if self.pool:
self.pool.on_write_failure(address=self.unresolved_address)
await self.pool.on_write_failure(
address=self.unresolved_address
)
raise
except Neo4jError as e:
if self.pool:
Expand Down
4 changes: 3 additions & 1 deletion src/neo4j/_async/io/_bolt5.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,9 @@ async def _process_message(self, tag, fields):
raise
except (NotALeader, ForbiddenOnReadOnlyDatabase):
if self.pool:
self.pool.on_write_failure(address=self.unresolved_address)
await self.pool.on_write_failure(
address=self.unresolved_address
)
raise
except Neo4jError as e:
if self.pool:
Expand Down
18 changes: 10 additions & 8 deletions src/neo4j/_async/io/_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ async def deactivate(self, address):

await self._close_connections(closable_connections)

def on_write_failure(self, address):
async def on_write_failure(self, address):
raise WriteServiceUnavailable(
"No write service available for pool {}".format(self)
)
Expand Down Expand Up @@ -949,17 +949,19 @@ async def deactivate(self, address):
log.debug("[#0000] _: <POOL> deactivating address %r", address)
# We use `discard` instead of `remove` here since the former
# will not fail if the address has already been removed.
for database in self.routing_tables.keys():
self.routing_tables[database].routers.discard(address)
self.routing_tables[database].readers.discard(address)
self.routing_tables[database].writers.discard(address)
async with self.refresh_lock:
for database in self.routing_tables.keys():
self.routing_tables[database].routers.discard(address)
self.routing_tables[database].readers.discard(address)
self.routing_tables[database].writers.discard(address)
log.debug("[#0000] _: <POOL> table=%r", self.routing_tables)
await super(AsyncNeo4jPool, self).deactivate(address)

def on_write_failure(self, address):
async def on_write_failure(self, address):
""" Remove a writer address from the routing table, if present.
"""
log.debug("[#0000] _: <POOL> removing writer %r", address)
for database in self.routing_tables.keys():
self.routing_tables[database].writers.discard(address)
async with self.refresh_lock:
for database in self.routing_tables.keys():
self.routing_tables[database].writers.discard(address)
log.debug("[#0000] _: <POOL> table=%r", self.routing_tables)
4 changes: 3 additions & 1 deletion src/neo4j/_sync/io/_bolt3.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,9 @@ def _process_message(self, tag, fields):
raise
except (NotALeader, ForbiddenOnReadOnlyDatabase):
if self.pool:
self.pool.on_write_failure(address=self.unresolved_address)
self.pool.on_write_failure(
address=self.unresolved_address
)
raise
except Neo4jError as e:
self.pool.on_neo4j_error(e, self)
Expand Down
4 changes: 3 additions & 1 deletion src/neo4j/_sync/io/_bolt4.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,9 @@ def _process_message(self, tag, fields):
raise
except (NotALeader, ForbiddenOnReadOnlyDatabase):
if self.pool:
self.pool.on_write_failure(address=self.unresolved_address)
self.pool.on_write_failure(
address=self.unresolved_address
)
raise
except Neo4jError as e:
if self.pool:
Expand Down
4 changes: 3 additions & 1 deletion src/neo4j/_sync/io/_bolt5.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,9 @@ def _process_message(self, tag, fields):
raise
except (NotALeader, ForbiddenOnReadOnlyDatabase):
if self.pool:
self.pool.on_write_failure(address=self.unresolved_address)
self.pool.on_write_failure(
address=self.unresolved_address
)
raise
except Neo4jError as e:
if self.pool:
Expand Down
14 changes: 8 additions & 6 deletions src/neo4j/_sync/io/_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -946,17 +946,19 @@ def deactivate(self, address):
log.debug("[#0000] _: <POOL> deactivating address %r", address)
# We use `discard` instead of `remove` here since the former
# will not fail if the address has already been removed.
for database in self.routing_tables.keys():
self.routing_tables[database].routers.discard(address)
self.routing_tables[database].readers.discard(address)
self.routing_tables[database].writers.discard(address)
with self.refresh_lock:
for database in self.routing_tables.keys():
self.routing_tables[database].routers.discard(address)
self.routing_tables[database].readers.discard(address)
self.routing_tables[database].writers.discard(address)
log.debug("[#0000] _: <POOL> table=%r", self.routing_tables)
super(Neo4jPool, self).deactivate(address)

def on_write_failure(self, address):
""" Remove a writer address from the routing table, if present.
"""
log.debug("[#0000] _: <POOL> removing writer %r", address)
for database in self.routing_tables.keys():
self.routing_tables[database].writers.discard(address)
with self.refresh_lock:
for database in self.routing_tables.keys():
self.routing_tables[database].writers.discard(address)
log.debug("[#0000] _: <POOL> table=%r", self.routing_tables)