Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 30 additions & 19 deletions src/neo4j/_async_compat/network/_bolt_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,20 +240,20 @@ async def _connect_secure(cls, resolved_address, timeout, keep_alive, ssl):
log.debug("[#0000] S: <TIMEOUT> %s", resolved_address)
log.debug("[#0000] C: <CLOSE> %s", resolved_address)
if s:
await cls.close_socket(s)
cls._kill_raw_socket(s)
raise ServiceUnavailable(
"Timed out trying to establish connection to {!r}".format(
resolved_address)) from None
except asyncio.CancelledError:
log.debug("[#0000] S: <CANCELLED> %s", resolved_address)
log.debug("[#0000] C: <CLOSE> %s", resolved_address)
if s:
await cls.close_socket(s)
cls._kill_raw_socket(s)
raise
except (SSLError, CertificateError) as error:
local_port = s.getsockname()[1]
if s:
await cls.close_socket(s)
cls._kill_raw_socket(s)
raise BoltSecurityError(
message="Failed to establish encrypted connection.",
address=(resolved_address._host_name, local_port)
Expand All @@ -263,7 +263,7 @@ async def _connect_secure(cls, resolved_address, timeout, keep_alive, ssl):
" ".join(map(repr, error.args)))
log.debug("[#0000] C: <CLOSE> %s", resolved_address)
if s:
await cls.close_socket(s)
cls._kill_raw_socket(s)
if isinstance(error, OSError):
raise ServiceUnavailable(
"Failed to establish connection to {!r} (reason {})"
Expand Down Expand Up @@ -350,14 +350,18 @@ async def close_socket(cls, socket_):
except OSError:
pass
else:
try:
socket_.shutdown(SHUT_RDWR)
except OSError:
pass
try:
socket_.close()
except OSError:
pass
cls._kill_raw_socket(socket_)

@classmethod
def _kill_raw_socket(cls, socket_):
try:
socket_.shutdown(SHUT_RDWR)
except OSError:
pass
try:
socket_.close()
except OSError:
pass

@classmethod
async def connect(cls, address, *, tcp_timeout, deadline,
Expand Down Expand Up @@ -409,7 +413,10 @@ async def connect(cls, address, *, tcp_timeout, deadline,
log.debug("[#%04X] C: <CANCELED> %s", local_port,
resolved_address)
if s:
await cls.close_socket(s)
try:
s.kill()
except OSError:
pass
raise
except Exception:
if s:
Expand Down Expand Up @@ -528,15 +535,15 @@ def _connect(cls, resolved_address, timeout, keep_alive):
except SocketTimeout:
log.debug("[#0000] S: <TIMEOUT> %s", resolved_address)
log.debug("[#0000] C: <CLOSE> %s", resolved_address)
cls.close_socket(s)
cls._kill_raw_socket(s)
raise ServiceUnavailable(
"Timed out trying to establish connection to {!r}".format(
resolved_address))
except Exception as error:
log.debug("[#0000] S: <ERROR> %s %s", type(error).__name__,
" ".join(map(repr, error.args)))
log.debug("[#0000] C: <CLOSE> %s", resolved_address)
cls.close_socket(s)
cls._kill_raw_socket(s)
if isinstance(error, OSError):
raise ServiceUnavailable(
"Failed to establish connection to {!r} (reason {})"
Expand All @@ -554,7 +561,7 @@ def _secure(cls, s, host, ssl_context):
sni_host = host if HAS_SNI and host else None
s = ssl_context.wrap_socket(s, server_hostname=sni_host)
except (OSError, SSLError, CertificateError) as cause:
cls.close_socket(s)
cls._kill_raw_socket(s)
raise BoltSecurityError(
message="Failed to establish encrypted connection.",
address=(host, local_port)
Expand Down Expand Up @@ -615,15 +622,15 @@ def _handshake(cls, s, resolved_address, deadline):
# If no data is returned after a successful select
# response, the server has closed the connection
log.debug("[#%04X] S: <CLOSE>", local_port)
cls.close_socket(s)
cls._kill_raw_socket(s)
raise ServiceUnavailable(
f"Connection to {resolved_address} closed without handshake "
"response"
)
if data_size != 4:
# Some garbled data has been received
log.debug("[#%04X] S: @*#!", local_port)
cls.close_socket(s)
cls._kill_raw_socket(s)
raise BoltProtocolError(
"Expected four byte Bolt handshake response from "
f"{resolved_address!r}, received {response!r} instead; "
Expand All @@ -632,7 +639,7 @@ def _handshake(cls, s, resolved_address, deadline):
)
elif response == b"HTTP":
log.debug("[#%04X] S: <CLOSE>", local_port)
cls.close_socket(s)
cls._kill_raw_socket(s)
raise ServiceUnavailable(
f"Cannot to connect to Bolt service on {resolved_address!r} "
"(looks like HTTP)"
Expand All @@ -646,6 +653,10 @@ def _handshake(cls, s, resolved_address, deadline):
def close_socket(cls, socket_):
if isinstance(socket_, BoltSocket):
socket_ = socket_._socket
cls._kill_raw_socket(socket_)

@classmethod
def _kill_raw_socket(cls, socket_):
try:
socket_.shutdown(SHUT_RDWR)
except OSError:
Expand Down