Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit 8da8af7

Browse files
committed
replication: add HTTP implementation for /frames endpoint
It still does not perform any kind of handshake, just asks for frames starting N. Tested with: $ cargo run -- --http-replication-listen-addr 127.0.0.1:8081 $ curl -d '{"next_offset": 0}' -v localhost:8081/frames
1 parent 047e744 commit 8da8af7

File tree

4 files changed

+82
-13
lines changed

4 files changed

+82
-13
lines changed

sqld/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -497,14 +497,14 @@ async fn start_primary(
497497
config.rpc_server_key.clone(),
498498
config.rpc_server_ca_cert.clone(),
499499
db_factory.clone(),
500-
logger,
500+
logger.clone(),
501501
idle_shutdown_layer.clone(),
502502
));
503503
}
504504

505505
if let Some(ref addr) = config.http_replication_addr {
506506
let auth = get_auth(config)?;
507-
join_set.spawn(replication::http::run(auth, *addr));
507+
join_set.spawn(replication::http::run(auth, *addr, logger));
508508
}
509509

510510
run_service(

sqld/src/replication/frame.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ pub struct FrameHeader {
2727
pub size_after: u32,
2828
}
2929

30-
#[derive(Clone)]
30+
#[derive(Clone, serde::Serialize, serde::Deserialize)]
3131
/// The owned version of a replication frame.
3232
/// Cloning this is cheap.
3333
pub struct Frame {

sqld/src/replication/http.rs

Lines changed: 77 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::replication::{frame::Frame, primary::frame_stream::FrameStream, ReplicationLogger};
12
use crate::Auth;
23
use anyhow::{Context, Result};
34
use hyper::server::conn::AddrIncoming;
@@ -9,7 +10,11 @@ use tower_http::trace::DefaultOnResponse;
910
use tower_http::{compression::CompressionLayer, cors};
1011
use tracing::{Level, Span};
1112

12-
pub(crate) async fn run(auth: Arc<Auth>, addr: SocketAddr) -> Result<()> {
13+
pub(crate) async fn run(
14+
auth: Arc<Auth>,
15+
addr: SocketAddr,
16+
logger: Arc<ReplicationLogger>,
17+
) -> Result<()> {
1318
tracing::info!("listening for HTTP requests on {addr}");
1419

1520
fn trace_request<B>(req: &Request<B>, _span: &Span) {
@@ -34,7 +39,8 @@ pub(crate) async fn run(auth: Arc<Auth>, addr: SocketAddr) -> Result<()> {
3439
)
3540
.service_fn(move |req| {
3641
let auth = auth.clone();
37-
handle_request(auth, req)
42+
let logger = logger.clone();
43+
handle_request(auth, req, logger)
3844
});
3945

4046
let listener = tokio::net::TcpListener::bind(&addr).await?;
@@ -47,7 +53,11 @@ pub(crate) async fn run(auth: Arc<Auth>, addr: SocketAddr) -> Result<()> {
4753
Ok(())
4854
}
4955

50-
async fn handle_request(auth: Arc<Auth>, req: Request<Body>) -> Result<Response<Body>> {
56+
async fn handle_request(
57+
auth: Arc<Auth>,
58+
req: Request<Body>,
59+
logger: Arc<ReplicationLogger>,
60+
) -> Result<Response<Body>> {
5161
let auth_header = req.headers().get(hyper::header::AUTHORIZATION);
5262
let auth = match auth.authenticate_http(auth_header) {
5363
Ok(auth) => auth,
@@ -60,7 +70,7 @@ async fn handle_request(auth: Arc<Auth>, req: Request<Body>) -> Result<Response<
6070
};
6171

6272
match (req.method(), req.uri().path()) {
63-
(&Method::POST, "/frames") => handle_query(req, auth).await,
73+
(&Method::POST, "/frames") => handle_query(req, auth, logger).await,
6474
_ => Ok(Response::builder().status(404).body(Body::empty()).unwrap()),
6575
}
6676
}
@@ -70,6 +80,25 @@ pub struct FramesRequest {
7080
pub next_offset: u64,
7181
}
7282

83+
#[derive(Debug, serde::Deserialize, serde::Serialize)]
84+
pub struct Frames {
85+
pub frames: Vec<Frame>,
86+
}
87+
88+
impl Frames {
89+
pub fn new() -> Self {
90+
Self { frames: Vec::new() }
91+
}
92+
93+
pub fn push(&mut self, frame: Frame) {
94+
self.frames.push(frame);
95+
}
96+
97+
pub fn is_empty(&self) -> bool {
98+
self.frames.is_empty()
99+
}
100+
}
101+
73102
fn error(msg: &str, code: hyper::StatusCode) -> Response<Body> {
74103
let err = serde_json::json!({ "error": msg });
75104
Response::builder()
@@ -81,18 +110,58 @@ fn error(msg: &str, code: hyper::StatusCode) -> Response<Body> {
81110
async fn handle_query(
82111
mut req: Request<Body>,
83112
_auth: crate::auth::Authenticated,
113+
logger: Arc<ReplicationLogger>,
84114
) -> Result<Response<Body>> {
85115
let bytes = hyper::body::to_bytes(req.body_mut()).await?;
86116
let FramesRequest { next_offset } = match serde_json::from_slice(&bytes) {
87117
Ok(req) => req,
88118
Err(resp) => return Ok(error(&resp.to_string(), hyper::StatusCode::BAD_REQUEST)),
89119
};
90120

121+
let mut frame_stream = FrameStream::new(logger, next_offset);
122+
123+
if frame_stream.max_available_frame_no < next_offset {
124+
tracing::trace!("No frames available starting {next_offset}, returning 204 No Content");
125+
return Ok(Response::builder()
126+
.status(hyper::StatusCode::NO_CONTENT)
127+
.body(Body::empty())
128+
.unwrap());
129+
}
130+
131+
let mut frames = Frames::new();
132+
loop {
133+
use futures::StreamExt;
134+
135+
match frame_stream.next().await {
136+
Some(Ok(frame)) => {
137+
tracing::trace!("Read frame {}", frame_stream.current_frame_no);
138+
frames.push(frame);
139+
}
140+
Some(Err(e)) => {
141+
tracing::error!("Error reading frame: {}", e);
142+
return Ok(Response::builder()
143+
.status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
144+
.body(Body::empty())
145+
.unwrap());
146+
}
147+
None => break,
148+
}
149+
150+
// FIXME: also stop when we have enough frames to fill a large buffer
151+
if frame_stream.max_available_frame_no <= frame_stream.current_frame_no {
152+
break;
153+
}
154+
}
155+
156+
if frames.is_empty() {
157+
return Ok(Response::builder()
158+
.status(hyper::StatusCode::NO_CONTENT)
159+
.body(Body::empty())
160+
.unwrap());
161+
}
162+
91163
Ok(Response::builder()
92164
.status(hyper::StatusCode::OK)
93-
.body(Body::from(format!(
94-
"{{\"comment\":\"thx for sending the request\", \"next_offset\":{}}}",
95-
next_offset + 1
96-
)))
165+
.body(Body::from(serde_json::to_string(&frames)?))
97166
.unwrap())
98167
}

sqld/src/replication/primary/frame_stream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ use crate::replication::{FrameNo, LogReadError, ReplicationLogger};
1111
/// Streams frames from the replication log starting at `current_frame_no`.
1212
/// Only stops if the current frame is not in the log anymore.
1313
pub struct FrameStream {
14-
current_frame_no: FrameNo,
15-
max_available_frame_no: FrameNo,
14+
pub(crate) current_frame_no: FrameNo,
15+
pub(crate) max_available_frame_no: FrameNo,
1616
logger: Arc<ReplicationLogger>,
1717
state: FrameStreamState,
1818
}

0 commit comments

Comments
 (0)