-
Notifications
You must be signed in to change notification settings - Fork 36
HTTP replication #537
HTTP replication #537
Conversation
It still does not perform any kind of handshake, just asks for frames starting N. Tested with: $ cargo run -- --http-replication-listen-addr 127.0.0.1:8081 $ curl -d '{"next_offset": 0}' -v localhost:8081/frames
Serves the same purpose as gRPC hello and provides: - generation id - generation start index - database id
An arbitrary limit to make sure we do not overload memory.
With gRPC replication, it was reasonable to assume that there are listeners to the max frame number notifier, but with HTTP it's not necessarily the case. Since watch::send() fails if there are no receivers, we hereby switch to send_replace(), which successfully updates the value even if there are no active receivers.
The HTTP replication will now react to SnapshotRequired error by just sending all frames from a snapshot to the user. That's prone to overcommitting memory, but better than giving up. This change should be followed up by streaming the snapshot frames in multiple smaller bits.
sqld/src/replication/http.rs
Outdated
match (req.method(), req.uri().path()) { | ||
(&Method::GET, "/hello") => handle_hello(logger).await, | ||
(&Method::POST, "/frames") => handle_query(req, auth, logger).await, | ||
_ => Ok(Response::builder().status(404).body(Body::empty()).unwrap()), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we slowly started migrating to Axum (see the admin API). I don't want to slow you down on other stuff you need to do, but if you have some time, could you use Axum instead, so we don't have to port that later 🙏
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, I'll need to bootstrap myself with Axum first, but if we already have a precedent in the admin API, I'll start by reading that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
k, done, with one minor quirk: Axum is a little overeager in refusing plaintext HTTP requests when we except a JSON (lack of proper Content-Type), so I decided to be a little more lenient and just parse the string at runtime in case it's a valid json after all.
for _ in 0..MAX_FRAMES_IN_SINGLE_RESPONSE { | ||
use futures::StreamExt; | ||
|
||
match frame_stream.next().await { | ||
Some(Ok(frame)) => { | ||
tracing::trace!("Read frame {}", frame_stream.current_frame_no); | ||
frames.push(frame); | ||
} | ||
Some(Err(LogReadError::SnapshotRequired)) => { | ||
drop(frame_stream); | ||
if frames.is_empty() { | ||
tracing::debug!("Snapshot required, switching to snapshot mode"); | ||
frames = load_snapshot(logger, next_offset)?; | ||
} else { | ||
tracing::debug!("Snapshot required, but some frames were read - returning."); | ||
} | ||
break; | ||
} | ||
Some(Err(e)) => { | ||
tracing::error!("Error reading frame: {}", e); | ||
return Ok(Response::builder() | ||
.status(hyper::StatusCode::INTERNAL_SERVER_ERROR) | ||
.body(Body::empty()) | ||
.unwrap()); | ||
} | ||
None => break, | ||
} | ||
|
||
if frame_stream.max_available_frame_no <= frame_stream.current_frame_no { | ||
break; | ||
} | ||
} | ||
|
||
if frames.is_empty() { | ||
return Ok(Response::builder() | ||
.status(hyper::StatusCode::NO_CONTENT) | ||
.body(Body::empty()) | ||
.unwrap()); | ||
} | ||
|
||
Ok(Response::builder() | ||
.status(hyper::StatusCode::OK) | ||
.body(Body::from(serde_json::to_string(&frames)?)) | ||
.unwrap()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wdyt about streaming frames instead? https://docs.rs/hyper/latest/hyper/body/struct.Body.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yeah, makes perfect sense, will do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MarinPostma I'm inclined to stall returning a stream here, because of the current convoluted logic when a snapshot is needed - then, we either return whatever frames we have so far, or just ship the whole snapshot, and that whole magic logic would need to be wrapped in a single struct that implements futures::Stream. Doable of course, but perhaps in interation 2?
Following the example in admin_api.rs
This draft adds an experimental endpoint which serves frames for replication purposes. It's capabilities are:
/hello
endpoint/frames
endpointAnd that's about it. For simplicity, when frames can't be served from a FrameStream and instead need to be loaded from a snapshot file, the whole contents of the snapshot is sent in a single request. That's a limitation, but it's also why the endpoint is considered experimental. For the same reason, it is right now implemented as a separate service running on a dedicated port (e.g. 8081), so that it's isolated from user workloads.
This endpoint also does not account any reads in the metrics, and it definitely should, as it's very read-heavy. We should also consider having a separate auth path for it, because its main purpose is for being used by replicas (possibly embedded), not for regular direct access to frames.
Testing setup for sqld:
curl localhost:8081/hello curl -d '{"next_offset": 1}' localhost:8081/frames
HTTP replication is also used in this experimental project, and in particular it can be tested via
cargo run --example replica
from libsql/crates/core source directory.