Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lightning-block-sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ rpc-client = [ "serde", "serde_json", "chunked_transfer" ]
[dependencies]
bitcoin = "0.26"
lightning = { version = "0.0.14", path = "../lightning" }
tokio = { version = "1.0", features = [ "io-util", "net" ], optional = true }
tokio = { version = "1.0", features = [ "io-util", "net", "time" ], optional = true }
serde = { version = "1.0", features = ["derive"], optional = true }
serde_json = { version = "1.0", optional = true }
chunked_transfer = { version = "1.4", optional = true }
Expand Down
82 changes: 55 additions & 27 deletions lightning-block-sync/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ use std::net::TcpStream;
/// Timeout for operations on TCP streams.
const TCP_STREAM_TIMEOUT: Duration = Duration::from_secs(5);

/// Timeout for reading the first byte of a response. This is separate from the general read
/// timeout as it is not uncommon for Bitcoin Core to be blocked waiting on UTXO cache flushes for
/// upwards of a minute or more. Note that we always retry once when we time out, so the maximum
/// time we allow Bitcoin Core to block for is twice this value.
const TCP_STREAM_RESPONSE_TIMEOUT: Duration = Duration::from_secs(120);

/// Maximum HTTP message header size in bytes.
const MAX_HTTP_MESSAGE_HEADER_SIZE: usize = 8192;

Expand Down Expand Up @@ -158,16 +164,19 @@ impl HttpClient {
let endpoint = self.stream.peer_addr().unwrap();
match self.send_request(request).await {
Ok(bytes) => Ok(bytes),
Err(e) => match e.kind() {
std::io::ErrorKind::ConnectionReset |
std::io::ErrorKind::ConnectionAborted |
std::io::ErrorKind::UnexpectedEof => {
// Reconnect if the connection was closed. This may happen if the server's
// keep-alive limits are reached.
*self = Self::connect(endpoint)?;
self.send_request(request).await
},
_ => Err(e),
Err(_) => {
// Reconnect and retry on fail. This can happen if the connection was closed after
// the keep-alive limits are reached, or generally if the request timed out due to
// Bitcoin Core being stuck on a long-running operation or its RPC queue being
// full.
// Block 100ms before retrying the request as in many cases the source of the error
// may be persistent for some time.
#[cfg(feature = "tokio")]
tokio::time::sleep(Duration::from_millis(100)).await;
#[cfg(not(feature = "tokio"))]
std::thread::sleep(Duration::from_millis(100));
*self = Self::connect(endpoint)?;
self.send_request(request).await
},
}
}
Expand Down Expand Up @@ -206,25 +215,44 @@ impl HttpClient {
#[cfg(not(feature = "tokio"))]
let mut reader = std::io::BufReader::new(limited_stream);

macro_rules! read_line { () => { {
let mut line = String::new();
#[cfg(feature = "tokio")]
let bytes_read = reader.read_line(&mut line).await?;
#[cfg(not(feature = "tokio"))]
let bytes_read = reader.read_line(&mut line)?;

match bytes_read {
0 => None,
_ => {
// Remove trailing CRLF
if line.ends_with('\n') { line.pop(); if line.ends_with('\r') { line.pop(); } }
Some(line)
},
}
} } }
macro_rules! read_line {
() => { read_line!(0) };
($retry_count: expr) => { {
let mut line = String::new();
let mut timeout_count: u64 = 0;
let bytes_read = loop {
#[cfg(feature = "tokio")]
let read_res = reader.read_line(&mut line).await;
#[cfg(not(feature = "tokio"))]
let read_res = reader.read_line(&mut line);
match read_res {
Ok(bytes_read) => break bytes_read,
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any other errors we would want to retry on? Like Interrupted docs https://doc.rust-lang.org/std/io/enum.ErrorKind.html#variant.Interrupted seems to indicate that it can probably be retried

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I don't think you can get an Interrupted in a read on Linux and probably OSX. I'm not really sure what it would even mean in practice - usually it would imply another thread someone sent some signal to interrupt, but in a read not so much.

All that said, maybe it makes sense to just retry everything. Another potential error we could see is a Bitcoin Core RPC queue full error, which would present as just a generic HTTP error.

timeout_count += 1;
if timeout_count > $retry_count {
return Err(e);
} else {
continue;
}
}
Err(e) => return Err(e),
}
};

match bytes_read {
0 => None,
_ => {
// Remove trailing CRLF
if line.ends_with('\n') { line.pop(); if line.ends_with('\r') { line.pop(); } }
Some(line)
},
}
} }
}

// Read and parse status line
let status_line = read_line!()
// Note that we allow retrying a few times to reach TCP_STREAM_RESPONSE_TIMEOUT.
let status_line = read_line!(TCP_STREAM_RESPONSE_TIMEOUT.as_secs() / TCP_STREAM_TIMEOUT.as_secs())
.ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no status line"))?;
let status = HttpStatus::parse(&status_line)?;

Expand Down