Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit a528e9e

Browse files
authored
Merge pull request #554 from libsql/mt-read-from-snapshot
multi-tenancy: replicate from snapshots
2 parents abace0d + 03187a4 commit a528e9e

File tree

10 files changed

+194
-115
lines changed

10 files changed

+194
-115
lines changed

libsqlx-server/assets/test/simple-log

20.2 KB
Binary file not shown.

libsqlx-server/src/allocation/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ impl Database {
103103
Compactor::new(
104104
max_log_size,
105105
replication_log_compact_interval,
106-
compaction_queue,
106+
compaction_queue.clone(),
107107
database_id,
108108
),
109109
false,
@@ -124,6 +124,7 @@ impl Database {
124124
db: Arc::new(db),
125125
replica_streams: HashMap::new(),
126126
frame_notifier: receiver,
127+
snapshot_store: compaction_queue.snapshot_store.clone(),
127128
},
128129
compact_interval,
129130
}
@@ -275,6 +276,7 @@ impl Allocation {
275276
db,
276277
replica_streams,
277278
frame_notifier,
279+
snapshot_store,
278280
..
279281
},
280282
..
@@ -289,6 +291,7 @@ impl Allocation {
289291
dipatcher: self.dispatcher.clone() as _,
290292
notifier: frame_notifier.clone(),
291293
buffer: Vec::new(),
294+
snapshot_store: snapshot_store.clone(),
292295
};
293296

294297
match replica_streams.entry(msg.from) {

libsqlx-server/src/allocation/primary/mod.rs

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,19 @@ use std::collections::HashMap;
22
use std::mem::size_of;
33
use std::sync::Arc;
44
use std::task::{Context, Poll};
5+
use std::time::Duration;
56

67
use bytes::Bytes;
78
use libsqlx::libsql::{LibsqlDatabase, PrimaryType};
89
use libsqlx::result_builder::ResultBuilder;
9-
use libsqlx::{FrameNo, LogReadError, ReplicationLogger};
10+
use libsqlx::{Frame, FrameHeader, FrameNo, LogReadError, ReplicationLogger};
1011
use tokio::task::block_in_place;
1112

1213
use crate::linc::bus::Dispatch;
1314
use crate::linc::proto::{BuilderStep, Enveloppe, Frames, Message, StepError, Value};
1415
use crate::linc::{Inbound, NodeId, Outbound};
1516
use crate::meta::DatabaseId;
17+
use crate::snapshot_store::SnapshotStore;
1618

1719
use super::{ConnectionHandler, ExecFn, FRAMES_MESSAGE_MAX_COUNT};
1820

@@ -24,6 +26,7 @@ pub struct PrimaryDatabase {
2426
pub db: Arc<LibsqlDatabase<PrimaryType>>,
2527
pub replica_streams: HashMap<NodeId, (u32, tokio::task::JoinHandle<()>)>,
2628
pub frame_notifier: tokio::sync::watch::Receiver<FrameNo>,
29+
pub snapshot_store: Arc<SnapshotStore>,
2730
}
2831

2932
pub struct ProxyResponseBuilder {
@@ -206,6 +209,7 @@ pub struct FrameStreamer {
206209
pub dipatcher: Arc<dyn Dispatch>,
207210
pub notifier: tokio::sync::watch::Receiver<FrameNo>,
208211
pub buffer: Vec<Bytes>,
212+
pub snapshot_store: Arc<SnapshotStore>,
209213
}
210214

211215
impl FrameStreamer {
@@ -234,7 +238,53 @@ impl FrameStreamer {
234238
}
235239
}
236240
Err(LogReadError::Error(_)) => todo!("handle log read error"),
237-
Err(LogReadError::SnapshotRequired) => todo!("handle reading from snapshot"),
241+
Err(LogReadError::SnapshotRequired) => self.send_snapshot().await,
242+
}
243+
}
244+
}
245+
246+
async fn send_snapshot(&mut self) {
247+
tracing::debug!("sending frames from snapshot");
248+
loop {
249+
match self
250+
.snapshot_store
251+
.locate_file(self.database_id, self.next_frame_no)
252+
{
253+
Some(file) => {
254+
let mut iter = file.frames_iter_from(self.next_frame_no).peekable();
255+
256+
while let Some(frame) = block_in_place(|| iter.next()) {
257+
let frame = frame.unwrap();
258+
// TODO: factorize in maybe_send
259+
if self.buffer.len() > FRAMES_MESSAGE_MAX_COUNT {
260+
self.send_frames().await;
261+
}
262+
let size_after = iter
263+
.peek()
264+
.is_none()
265+
.then_some(file.header.size_after)
266+
.unwrap_or(0);
267+
let frame = Frame::from_parts(
268+
&FrameHeader {
269+
frame_no: frame.header().frame_no,
270+
page_no: frame.header().page_no,
271+
size_after,
272+
},
273+
frame.page(),
274+
);
275+
self.next_frame_no = frame.header().frame_no + 1;
276+
self.buffer.push(frame.bytes());
277+
278+
tokio::task::yield_now().await;
279+
}
280+
281+
break;
282+
}
283+
None => {
284+
// snapshot is not ready yet, wait a bit
285+
// FIXME: notify when snapshot becomes ready instead of using loop
286+
tokio::time::sleep(Duration::from_millis(100)).await;
287+
}
238288
}
239289
}
240290
}

libsqlx-server/src/compactor.rs

Lines changed: 94 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::borrow::Cow;
12
use std::fs::File;
23
use std::io::{BufWriter, Write};
34
use std::mem::size_of;
@@ -8,7 +9,7 @@ use std::sync::{
89
Arc,
910
};
1011

11-
use bytemuck::{bytes_of, pod_read_unaligned, Pod, Zeroable};
12+
use bytemuck::{bytes_of, pod_read_unaligned, try_from_bytes, Pod, Zeroable};
1213
use bytes::{Bytes, BytesMut};
1314
use heed::byteorder::BigEndian;
1415
use heed_types::{SerdeBincode, U64};
@@ -37,7 +38,7 @@ pub struct CompactionQueue {
3738
next_id: AtomicU64,
3839
notify: watch::Sender<Option<u64>>,
3940
db_path: PathBuf,
40-
snapshot_store: Arc<SnapshotStore>,
41+
pub snapshot_store: Arc<SnapshotStore>,
4142
}
4243

4344
impl CompactionQueue {
@@ -109,9 +110,17 @@ impl CompactionQueue {
109110
let to_compact_path = to_compact_path.clone();
110111
let db_path = self.db_path.clone();
111112
move || {
112-
let mut builder = SnapshotBuilder::new(&db_path, job.database_id, job.log_id)?;
113113
let log = LogFile::new(to_compact_path)?;
114-
for frame in log.rev_deduped() {
114+
let (start_fno, end_fno, iter) =
115+
log.rev_deduped().expect("compaction job with no frames!");
116+
let mut builder = SnapshotBuilder::new(
117+
&db_path,
118+
job.database_id,
119+
job.log_id,
120+
start_fno,
121+
end_fno,
122+
)?;
123+
for frame in iter {
115124
let frame = frame?;
116125
builder.push_frame(frame)?;
117126
}
@@ -168,8 +177,50 @@ pub struct SnapshotBuilder {
168177
last_seen_frame_no: u64,
169178
}
170179

180+
#[derive(Debug, Clone, Copy, Pod, Zeroable)]
181+
#[repr(C)]
182+
pub struct SnapshotFrameHeader {
183+
pub frame_no: FrameNo,
184+
pub page_no: u32,
185+
_pad: u32,
186+
}
187+
188+
#[derive(Clone)]
189+
pub struct SnapshotFrame {
190+
data: Bytes,
191+
}
192+
193+
impl SnapshotFrame {
194+
const SIZE: usize = size_of::<SnapshotFrameHeader>() + 4096;
195+
196+
pub fn try_from_bytes(data: Bytes) -> crate::Result<Self> {
197+
if data.len() != Self::SIZE {
198+
color_eyre::eyre::bail!("invalid snapshot frame")
199+
}
200+
201+
Ok(Self { data })
202+
}
203+
204+
pub fn header(&self) -> Cow<SnapshotFrameHeader> {
205+
let data = &self.data[..size_of::<SnapshotFrameHeader>()];
206+
try_from_bytes(data)
207+
.map(Cow::Borrowed)
208+
.unwrap_or_else(|_| Cow::Owned(pod_read_unaligned(data)))
209+
}
210+
211+
pub(crate) fn page(&self) -> &[u8] {
212+
&self.data[size_of::<SnapshotFrameHeader>()..]
213+
}
214+
}
215+
171216
impl SnapshotBuilder {
172-
pub fn new(db_path: &Path, db_id: DatabaseId, snapshot_id: Uuid) -> color_eyre::Result<Self> {
217+
pub fn new(
218+
db_path: &Path,
219+
db_id: DatabaseId,
220+
snapshot_id: Uuid,
221+
start_fno: FrameNo,
222+
end_fno: FrameNo,
223+
) -> color_eyre::Result<Self> {
173224
let temp_dir = db_path.join("tmp");
174225
let mut target = BufWriter::new(NamedTempFile::new_in(&temp_dir)?);
175226
// reserve header space
@@ -178,8 +229,8 @@ impl SnapshotBuilder {
178229
Ok(Self {
179230
header: SnapshotFileHeader {
180231
db_id,
181-
start_frame_no: u64::MAX,
182-
end_frame_no: u64::MIN,
232+
start_frame_no: start_fno,
233+
end_frame_no: end_fno,
183234
frame_count: 0,
184235
size_after: 0,
185236
_pad: 0,
@@ -194,16 +245,20 @@ impl SnapshotBuilder {
194245
pub fn push_frame(&mut self, frame: Frame) -> color_eyre::Result<()> {
195246
assert!(frame.header().frame_no < self.last_seen_frame_no);
196247
self.last_seen_frame_no = frame.header().frame_no;
197-
if frame.header().frame_no < self.header.start_frame_no {
198-
self.header.start_frame_no = frame.header().frame_no;
199-
}
200248

201-
if frame.header().frame_no > self.header.end_frame_no {
202-
self.header.end_frame_no = frame.header().frame_no;
249+
if frame.header().frame_no == self.header.end_frame_no {
203250
self.header.size_after = frame.header().size_after;
204251
}
205252

206-
self.snapshot_file.write_all(frame.as_slice())?;
253+
let header = SnapshotFrameHeader {
254+
frame_no: frame.header().frame_no,
255+
page_no: frame.header().page_no,
256+
_pad: 0,
257+
};
258+
259+
self.snapshot_file.write_all(bytes_of(&header))?;
260+
self.snapshot_file.write_all(frame.page())?;
261+
207262
self.header.frame_count += 1;
208263

209264
Ok(())
@@ -241,18 +296,18 @@ impl SnapshotFile {
241296
}
242297

243298
/// Iterator on the frames contained in the snapshot file, in reverse frame_no order.
244-
pub fn frames_iter(&self) -> impl Iterator<Item = libsqlx::Result<Bytes>> + '_ {
299+
pub fn frames_iter(&self) -> impl Iterator<Item = crate::Result<SnapshotFrame>> + '_ {
245300
let mut current_offset = 0;
246301
std::iter::from_fn(move || {
247302
if current_offset >= self.header.frame_count {
248303
return None;
249304
}
250305
let read_offset = size_of::<SnapshotFileHeader>() as u64
251-
+ current_offset * LogFile::FRAME_SIZE as u64;
306+
+ current_offset * SnapshotFrame::SIZE as u64;
252307
current_offset += 1;
253-
let mut buf = BytesMut::zeroed(LogFile::FRAME_SIZE);
308+
let mut buf = BytesMut::zeroed(SnapshotFrame::SIZE);
254309
match self.file.read_exact_at(&mut buf, read_offset as _) {
255-
Ok(_) => Some(Ok(buf.freeze())),
310+
Ok(_) => Some(Ok(SnapshotFrame { data: buf.freeze() })),
256311
Err(e) => Some(Err(e.into())),
257312
}
258313
})
@@ -262,19 +317,16 @@ impl SnapshotFile {
262317
pub fn frames_iter_from(
263318
&self,
264319
frame_no: u64,
265-
) -> impl Iterator<Item = libsqlx::Result<Bytes>> + '_ {
320+
) -> impl Iterator<Item = crate::Result<SnapshotFrame>> + '_ {
266321
let mut iter = self.frames_iter();
267322
std::iter::from_fn(move || match iter.next() {
268-
Some(Ok(bytes)) => match Frame::try_from_bytes(bytes.clone()) {
269-
Ok(frame) => {
270-
if frame.header().frame_no < frame_no {
271-
None
272-
} else {
273-
Some(Ok(bytes))
274-
}
323+
Some(Ok(frame)) => {
324+
if frame.header().frame_no < frame_no {
325+
None
326+
} else {
327+
Some(Ok(frame))
275328
}
276-
Err(e) => Some(Err(e)),
277-
},
329+
}
278330
other => other,
279331
})
280332
}
@@ -331,20 +383,23 @@ mod test {
331383
let snapshot_file = SnapshotFile::open(&snapshot_path).unwrap();
332384
assert_eq!(snapshot_file.header.start_frame_no, expected_start_frameno);
333385
assert_eq!(snapshot_file.header.end_frame_no, expected_end_frameno);
334-
assert!(snapshot_file.frames_iter().all(|f| expected_page_content
335-
.remove(&Frame::try_from_bytes(f.unwrap()).unwrap().header().page_no)));
386+
assert!(snapshot_file
387+
.frames_iter()
388+
.all(|f| expected_page_content.remove(&f.unwrap().header().page_no)));
336389
assert!(expected_page_content.is_empty());
337390

338-
assert_eq!(snapshot_file
339-
.frames_iter()
340-
.map(Result::unwrap)
341-
.map(Frame::try_from_bytes)
342-
.map(Result::unwrap)
343-
.map(|f| f.header().frame_no)
344-
.reduce(|prev, new| {
345-
assert!(new < prev);
346-
new
347-
}).unwrap(), 0);
391+
assert_eq!(
392+
snapshot_file
393+
.frames_iter()
394+
.map(Result::unwrap)
395+
.map(|f| f.header().frame_no)
396+
.reduce(|prev, new| {
397+
assert!(new < prev);
398+
new
399+
})
400+
.unwrap(),
401+
0
402+
);
348403

349404
assert_eq!(store.locate(database_id, 0).unwrap().snapshot_id, log_id);
350405
}

libsqlx-server/src/snapshot_store.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,13 @@ use std::mem::size_of;
22
use std::path::PathBuf;
33

44
use bytemuck::{Pod, Zeroable};
5-
use heed::BytesDecode;
6-
use heed_types::{ByteSlice, CowType, SerdeBincode};
5+
use heed_types::{CowType, SerdeBincode};
76
use libsqlx::FrameNo;
87
use serde::{Deserialize, Serialize};
98
use tokio::task::block_in_place;
109
use uuid::Uuid;
1110

12-
use crate::meta::DatabaseId;
11+
use crate::{compactor::SnapshotFile, meta::DatabaseId};
1312

1413
#[derive(Clone, Copy, Zeroable, Pod, Debug)]
1514
#[repr(transparent)]
@@ -92,6 +91,10 @@ impl SnapshotStore {
9291
end_frame_no: u64::MAX.into(),
9392
};
9493

94+
for entry in self.database.lazily_decode_data().iter(&txn).unwrap() {
95+
let (k, _) = entry.unwrap();
96+
}
97+
9598
match self
9699
.database
97100
.get_lower_than_or_equal_to(&txn, &key)
@@ -103,6 +106,11 @@ impl SnapshotStore {
103106
} else if frame_no >= key.start_frame_no.into()
104107
&& frame_no <= key.end_frame_no.into()
105108
{
109+
tracing::debug!(
110+
"found snapshot for {frame_no}; {}-{}",
111+
u64::from(key.start_frame_no),
112+
u64::from(key.end_frame_no)
113+
);
106114
return Some(v);
107115
} else {
108116
None
@@ -111,6 +119,15 @@ impl SnapshotStore {
111119
Err(_) => todo!(),
112120
}
113121
}
122+
123+
pub fn locate_file(&self, database_id: DatabaseId, frame_no: FrameNo) -> Option<SnapshotFile> {
124+
let meta = self.locate(database_id, frame_no)?;
125+
let path = self
126+
.db_path
127+
.join("snapshots")
128+
.join(meta.snapshot_id.to_string());
129+
Some(SnapshotFile::open(&path).unwrap())
130+
}
114131
}
115132

116133
#[cfg(test)]

0 commit comments

Comments
 (0)