Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit 047e744

Browse files
committed
replication: add HTTP endpoint skeleton
1 parent 4355422 commit 047e744

File tree

4 files changed

+111
-0
lines changed

4 files changed

+111
-0
lines changed

sqld/src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ pub struct Config {
104104
pub allow_replica_overwrite: bool,
105105
pub max_response_size: u64,
106106
pub snapshot_exec: Option<String>,
107+
pub http_replication_addr: Option<SocketAddr>,
107108
}
108109

109110
impl Default for Config {
@@ -143,6 +144,7 @@ impl Default for Config {
143144
allow_replica_overwrite: false,
144145
max_response_size: 10 * 1024 * 1024, // 10MiB
145146
snapshot_exec: None,
147+
http_replication_addr: None,
146148
}
147149
}
148150
}
@@ -500,6 +502,11 @@ async fn start_primary(
500502
));
501503
}
502504

505+
if let Some(ref addr) = config.http_replication_addr {
506+
let auth = get_auth(config)?;
507+
join_set.spawn(replication::http::run(auth, *addr));
508+
}
509+
503510
run_service(
504511
db_factory,
505512
config,

sqld/src/main.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,10 @@ struct Cli {
183183
/// Set a command to execute when a snapshot file is generated.
184184
#[clap(long, env = "SQLD_SNAPSHOT_EXEC")]
185185
snapshot_exec: Option<String>,
186+
187+
/// The address and port for the replication HTTP API.
188+
#[clap(long, env = "SQLD_HTTP_REPLICATION_LISTEN_ADDR")]
189+
http_replication_listen_addr: Option<SocketAddr>,
186190
}
187191

188192
#[derive(clap::Subcommand, Debug)]
@@ -292,6 +296,7 @@ fn config_from_args(args: Cli) -> Result<Config> {
292296
allow_replica_overwrite: args.allow_replica_overwrite,
293297
max_response_size: args.max_response_size.0,
294298
snapshot_exec: args.snapshot_exec,
299+
http_replication_addr: args.http_replication_listen_addr,
295300
})
296301
}
297302

sqld/src/replication/http.rs

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
use crate::Auth;
2+
use anyhow::{Context, Result};
3+
use hyper::server::conn::AddrIncoming;
4+
use hyper::{Body, Method, Request, Response};
5+
use std::net::SocketAddr;
6+
use std::sync::Arc;
7+
use tower::ServiceBuilder;
8+
use tower_http::trace::DefaultOnResponse;
9+
use tower_http::{compression::CompressionLayer, cors};
10+
use tracing::{Level, Span};
11+
12+
pub(crate) async fn run(auth: Arc<Auth>, addr: SocketAddr) -> Result<()> {
13+
tracing::info!("listening for HTTP requests on {addr}");
14+
15+
fn trace_request<B>(req: &Request<B>, _span: &Span) {
16+
tracing::debug!("got request: {} {}", req.method(), req.uri());
17+
}
18+
let service = ServiceBuilder::new()
19+
.layer(
20+
tower_http::trace::TraceLayer::new_for_http()
21+
.on_request(trace_request)
22+
.on_response(
23+
DefaultOnResponse::new()
24+
.level(Level::DEBUG)
25+
.latency_unit(tower_http::LatencyUnit::Micros),
26+
),
27+
)
28+
.layer(CompressionLayer::new())
29+
.layer(
30+
cors::CorsLayer::new()
31+
.allow_methods(cors::AllowMethods::any())
32+
.allow_headers(cors::Any)
33+
.allow_origin(cors::Any),
34+
)
35+
.service_fn(move |req| {
36+
let auth = auth.clone();
37+
handle_request(auth, req)
38+
});
39+
40+
let listener = tokio::net::TcpListener::bind(&addr).await?;
41+
let server = hyper::server::Server::builder(AddrIncoming::from_listener(listener)?)
42+
.tcp_nodelay(true)
43+
.serve(tower::make::Shared::new(service));
44+
45+
server.await.context("Http server exited with an error")?;
46+
47+
Ok(())
48+
}
49+
50+
async fn handle_request(auth: Arc<Auth>, req: Request<Body>) -> Result<Response<Body>> {
51+
let auth_header = req.headers().get(hyper::header::AUTHORIZATION);
52+
let auth = match auth.authenticate_http(auth_header) {
53+
Ok(auth) => auth,
54+
Err(err) => {
55+
return Ok(Response::builder()
56+
.status(hyper::StatusCode::UNAUTHORIZED)
57+
.body(err.to_string().into())
58+
.unwrap());
59+
}
60+
};
61+
62+
match (req.method(), req.uri().path()) {
63+
(&Method::POST, "/frames") => handle_query(req, auth).await,
64+
_ => Ok(Response::builder().status(404).body(Body::empty()).unwrap()),
65+
}
66+
}
67+
68+
#[derive(Debug, serde::Deserialize, serde::Serialize)]
69+
pub struct FramesRequest {
70+
pub next_offset: u64,
71+
}
72+
73+
fn error(msg: &str, code: hyper::StatusCode) -> Response<Body> {
74+
let err = serde_json::json!({ "error": msg });
75+
Response::builder()
76+
.status(code)
77+
.body(Body::from(serde_json::to_vec(&err).unwrap()))
78+
.unwrap()
79+
}
80+
81+
async fn handle_query(
82+
mut req: Request<Body>,
83+
_auth: crate::auth::Authenticated,
84+
) -> Result<Response<Body>> {
85+
let bytes = hyper::body::to_bytes(req.body_mut()).await?;
86+
let FramesRequest { next_offset } = match serde_json::from_slice(&bytes) {
87+
Ok(req) => req,
88+
Err(resp) => return Ok(error(&resp.to_string(), hyper::StatusCode::BAD_REQUEST)),
89+
};
90+
91+
Ok(Response::builder()
92+
.status(hyper::StatusCode::OK)
93+
.body(Body::from(format!(
94+
"{{\"comment\":\"thx for sending the request\", \"next_offset\":{}}}",
95+
next_offset + 1
96+
)))
97+
.unwrap())
98+
}

sqld/src/replication/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub mod frame;
2+
pub mod http;
23
pub mod primary;
34
pub mod replica;
45
mod snapshot;

0 commit comments

Comments
 (0)