From b465318e12e4374d7e7cb3c00036fded340f155a Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Fri, 7 May 2021 22:40:22 +0000 Subject: [PATCH 1/2] Allow retrying HTTP requests if we hit a socket timeout --- lightning-block-sync/Cargo.toml | 2 +- lightning-block-sync/src/http.rs | 23 +++++++++++++---------- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/lightning-block-sync/Cargo.toml b/lightning-block-sync/Cargo.toml index 6125874d963..00208d78b9b 100644 --- a/lightning-block-sync/Cargo.toml +++ b/lightning-block-sync/Cargo.toml @@ -16,7 +16,7 @@ rpc-client = [ "serde", "serde_json", "chunked_transfer" ] [dependencies] bitcoin = "0.26" lightning = { version = "0.0.14", path = "../lightning" } -tokio = { version = "1.0", features = [ "io-util", "net" ], optional = true } +tokio = { version = "1.0", features = [ "io-util", "net", "time" ], optional = true } serde = { version = "1.0", features = ["derive"], optional = true } serde_json = { version = "1.0", optional = true } chunked_transfer = { version = "1.4", optional = true } diff --git a/lightning-block-sync/src/http.rs b/lightning-block-sync/src/http.rs index f745e138fb9..1f5f046c0f8 100644 --- a/lightning-block-sync/src/http.rs +++ b/lightning-block-sync/src/http.rs @@ -158,16 +158,19 @@ impl HttpClient { let endpoint = self.stream.peer_addr().unwrap(); match self.send_request(request).await { Ok(bytes) => Ok(bytes), - Err(e) => match e.kind() { - std::io::ErrorKind::ConnectionReset | - std::io::ErrorKind::ConnectionAborted | - std::io::ErrorKind::UnexpectedEof => { - // Reconnect if the connection was closed. This may happen if the server's - // keep-alive limits are reached. - *self = Self::connect(endpoint)?; - self.send_request(request).await - }, - _ => Err(e), + Err(_) => { + // Reconnect and retry on fail. This can happen if the connection was closed after + // the keep-alive limits are reached, or generally if the request timed out due to + // Bitcoin Core being stuck on a long-running operation or its RPC queue being + // full. + // Block 100ms before retrying the request as in many cases the source of the error + // may be persistent for some time. + #[cfg(feature = "tokio")] + tokio::time::sleep(Duration::from_millis(100)).await; + #[cfg(not(feature = "tokio"))] + std::thread::sleep(Duration::from_millis(100)); + *self = Self::connect(endpoint)?; + self.send_request(request).await }, } } From 4ade6bcb69a1b93c9b5ddd991398abbd536906d0 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 6 May 2021 20:42:02 +0000 Subject: [PATCH 2/2] Increase the timeout for RPC responses from Bitcoin Core Early sample testing showed multiple users hitting EWOULDBLOCK/EAGAIN waiting for an initial response from Bitcoin Core while it was doing some long operation (eg UTXO cache flushing). Instead of only waiting 5 seconds for each attempt, we now wait a full two minutes, but only for the first header response, not each byte. --- lightning-block-sync/src/http.rs | 59 +++++++++++++++++++++++--------- 1 file changed, 42 insertions(+), 17 deletions(-) diff --git a/lightning-block-sync/src/http.rs b/lightning-block-sync/src/http.rs index 1f5f046c0f8..2cfb8e50593 100644 --- a/lightning-block-sync/src/http.rs +++ b/lightning-block-sync/src/http.rs @@ -24,6 +24,12 @@ use std::net::TcpStream; /// Timeout for operations on TCP streams. const TCP_STREAM_TIMEOUT: Duration = Duration::from_secs(5); +/// Timeout for reading the first byte of a response. This is separate from the general read +/// timeout as it is not uncommon for Bitcoin Core to be blocked waiting on UTXO cache flushes for +/// upwards of a minute or more. Note that we always retry once when we time out, so the maximum +/// time we allow Bitcoin Core to block for is twice this value. +const TCP_STREAM_RESPONSE_TIMEOUT: Duration = Duration::from_secs(120); + /// Maximum HTTP message header size in bytes. const MAX_HTTP_MESSAGE_HEADER_SIZE: usize = 8192; @@ -209,25 +215,44 @@ impl HttpClient { #[cfg(not(feature = "tokio"))] let mut reader = std::io::BufReader::new(limited_stream); - macro_rules! read_line { () => { { - let mut line = String::new(); - #[cfg(feature = "tokio")] - let bytes_read = reader.read_line(&mut line).await?; - #[cfg(not(feature = "tokio"))] - let bytes_read = reader.read_line(&mut line)?; - - match bytes_read { - 0 => None, - _ => { - // Remove trailing CRLF - if line.ends_with('\n') { line.pop(); if line.ends_with('\r') { line.pop(); } } - Some(line) - }, - } - } } } + macro_rules! read_line { + () => { read_line!(0) }; + ($retry_count: expr) => { { + let mut line = String::new(); + let mut timeout_count: u64 = 0; + let bytes_read = loop { + #[cfg(feature = "tokio")] + let read_res = reader.read_line(&mut line).await; + #[cfg(not(feature = "tokio"))] + let read_res = reader.read_line(&mut line); + match read_res { + Ok(bytes_read) => break bytes_read, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + timeout_count += 1; + if timeout_count > $retry_count { + return Err(e); + } else { + continue; + } + } + Err(e) => return Err(e), + } + }; + + match bytes_read { + 0 => None, + _ => { + // Remove trailing CRLF + if line.ends_with('\n') { line.pop(); if line.ends_with('\r') { line.pop(); } } + Some(line) + }, + } + } } + } // Read and parse status line - let status_line = read_line!() + // Note that we allow retrying a few times to reach TCP_STREAM_RESPONSE_TIMEOUT. + let status_line = read_line!(TCP_STREAM_RESPONSE_TIMEOUT.as_secs() / TCP_STREAM_TIMEOUT.as_secs()) .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no status line"))?; let status = HttpStatus::parse(&status_line)?;