Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit 855a0e8

Browse files
authored
Merge pull request #561 from libsql/mt-store-replica-applied-frameno
mt store replica applied frameno
2 parents 0d8d86c + e6aaeb4 commit 855a0e8

File tree

12 files changed

+135
-82
lines changed

12 files changed

+135
-82
lines changed

libsqlx-server/src/allocation/mod.rs

Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::linc::bus::Dispatch;
2727
use crate::linc::proto::{Frames, Message};
2828
use crate::linc::{Inbound, NodeId};
2929
use crate::meta::DatabaseId;
30+
use crate::replica_commit_store::ReplicaCommitStore;
3031

3132
use self::config::{AllocConfig, DbConfig};
3233
use self::primary::compactor::Compactor;
@@ -97,8 +98,14 @@ impl Database {
9798

9899
fn txn_timeout_duration(&self) -> Duration {
99100
match self {
100-
Database::Primary { transaction_timeout_duration, .. } => *transaction_timeout_duration,
101-
Database::Replica { transaction_timeout_duration, .. } => *transaction_timeout_duration,
101+
Database::Primary {
102+
transaction_timeout_duration,
103+
..
104+
} => *transaction_timeout_duration,
105+
Database::Replica {
106+
transaction_timeout_duration,
107+
..
108+
} => *transaction_timeout_duration,
102109
}
103110
}
104111
}
@@ -109,6 +116,7 @@ impl Database {
109116
path: PathBuf,
110117
dispatcher: Arc<dyn Dispatch>,
111118
compaction_queue: Arc<CompactionQueue>,
119+
replica_commit_store: Arc<ReplicaCommitStore>,
112120
) -> Self {
113121
let database_id = DatabaseId::from_name(&config.db_name);
114122

@@ -118,7 +126,7 @@ impl Database {
118126
replication_log_compact_interval,
119127
transaction_timeout_duration,
120128
} => {
121-
let (sender, receiver) = tokio::sync::watch::channel(0);
129+
let (sender, receiver) = tokio::sync::watch::channel(None);
122130
let db = LibsqlDatabase::new_primary(
123131
path,
124132
Compactor::new(
@@ -129,7 +137,7 @@ impl Database {
129137
),
130138
false,
131139
Box::new(move |fno| {
132-
let _ = sender.send(fno);
140+
let _ = sender.send(Some(fno));
133141
}),
134142
)
135143
.unwrap();
@@ -156,8 +164,22 @@ impl Database {
156164
proxy_request_timeout_duration,
157165
transaction_timeout_duration,
158166
} => {
159-
let rdb =
160-
LibsqlDatabase::new_replica(path, MAX_INJECTOR_BUFFER_CAPACITY, ()).unwrap();
167+
let next_frame_no =
168+
block_in_place(|| replica_commit_store.get_commit_index(database_id))
169+
.map(|fno| fno + 1)
170+
.unwrap_or(0);
171+
172+
let commit_callback = Arc::new(move |fno| {
173+
replica_commit_store.commit(database_id, fno);
174+
});
175+
176+
let rdb = LibsqlDatabase::new_replica(
177+
path,
178+
MAX_INJECTOR_BUFFER_CAPACITY,
179+
commit_callback,
180+
)
181+
.unwrap();
182+
161183
let wdb = RemoteDb {
162184
proxy_request_timeout_duration,
163185
};
@@ -167,7 +189,7 @@ impl Database {
167189

168190
let replicator = Replicator::new(
169191
dispatcher,
170-
0,
192+
next_frame_no,
171193
database_id,
172194
primary_node_id,
173195
injector,
@@ -556,9 +578,10 @@ mod test {
556578
use tokio::sync::Notify;
557579

558580
use crate::allocation::replica::ReplicaConnection;
559-
use crate::init_dirs;
560581
use crate::linc::bus::Bus;
582+
use crate::replica_commit_store::ReplicaCommitStore;
561583
use crate::snapshot_store::SnapshotStore;
584+
use crate::{init_dirs, replica_commit_store};
562585

563586
use super::*;
564587

@@ -567,7 +590,8 @@ mod test {
567590
let bus = Arc::new(Bus::new(0, |_, _| async {}));
568591
let _queue = bus.connect(1); // pretend connection to node 1
569592
let tmp = tempfile::TempDir::new().unwrap();
570-
let read_db = LibsqlDatabase::new_replica(tmp.path().to_path_buf(), 1, ()).unwrap();
593+
let read_db =
594+
LibsqlDatabase::new_replica(tmp.path().to_path_buf(), 1, Arc::new(|_| ())).unwrap();
571595
let write_db = RemoteDb {
572596
proxy_request_timeout_duration: Duration::from_millis(100),
573597
};
@@ -631,16 +655,23 @@ mod test {
631655
},
632656
};
633657
let (sender, inbox) = mpsc::channel(10);
634-
let env = EnvOpenOptions::new().max_dbs(10).map_size(4096 * 100).open(tmp.path()).unwrap();
658+
let env = EnvOpenOptions::new()
659+
.max_dbs(10)
660+
.map_size(4096 * 100)
661+
.open(tmp.path())
662+
.unwrap();
635663
let store = Arc::new(SnapshotStore::new(tmp.path().to_path_buf(), env.clone()).unwrap());
636-
let queue = Arc::new(CompactionQueue::new(env, tmp.path().to_path_buf(), store).unwrap());
664+
let queue =
665+
Arc::new(CompactionQueue::new(env.clone(), tmp.path().to_path_buf(), store).unwrap());
666+
let replica_commit_store = Arc::new(ReplicaCommitStore::new(env.clone()));
637667
let mut alloc = Allocation {
638668
inbox,
639669
database: Database::from_config(
640670
&config,
641671
tmp.path().to_path_buf(),
642672
bus.clone(),
643673
queue,
674+
replica_commit_store,
644675
),
645676
connections_futs: JoinSet::new(),
646677
next_conn_id: 0,
@@ -656,14 +687,18 @@ mod test {
656687

657688
let (snd, rcv) = oneshot::channel();
658689
let builder = StepResultsBuilder::new(snd);
659-
conn.execute(Program::seq(&["begin"]), Box::new(builder)).await.unwrap();
690+
conn.execute(Program::seq(&["begin"]), Box::new(builder))
691+
.await
692+
.unwrap();
660693
rcv.await.unwrap().unwrap();
661694

662695
tokio::time::sleep(Duration::from_secs(1)).await;
663696

664697
let (snd, rcv) = oneshot::channel();
665698
let builder = StepResultsBuilder::new(snd);
666-
conn.execute(Program::seq(&["create table test (x)"]), Box::new(builder)).await.unwrap();
699+
conn.execute(Program::seq(&["create table test (x)"]), Box::new(builder))
700+
.await
701+
.unwrap();
667702
assert!(rcv.await.unwrap().is_err());
668703
}
669704
}

libsqlx-server/src/allocation/primary/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ const MAX_STEP_BATCH_SIZE: usize = 100_000_000; // ~100kb
2525
pub struct PrimaryDatabase {
2626
pub db: Arc<LibsqlDatabase<PrimaryType>>,
2727
pub replica_streams: HashMap<NodeId, (u32, tokio::task::JoinHandle<()>)>,
28-
pub frame_notifier: tokio::sync::watch::Receiver<FrameNo>,
28+
pub frame_notifier: tokio::sync::watch::Receiver<Option<FrameNo>>,
2929
pub snapshot_store: Arc<SnapshotStore>,
3030
}
3131

@@ -207,7 +207,7 @@ pub struct FrameStreamer {
207207
pub req_no: u32,
208208
pub seq_no: u32,
209209
pub dipatcher: Arc<dyn Dispatch>,
210-
pub notifier: tokio::sync::watch::Receiver<FrameNo>,
210+
pub notifier: tokio::sync::watch::Receiver<Option<FrameNo>>,
211211
pub buffer: Vec<Bytes>,
212212
pub snapshot_store: Arc<SnapshotStore>,
213213
}
@@ -230,7 +230,7 @@ impl FrameStreamer {
230230
}
231231
if self
232232
.notifier
233-
.wait_for(|fno| *fno >= self.next_frame_no)
233+
.wait_for(|fno| fno.map(|f| f >= self.next_frame_no).unwrap_or(false))
234234
.await
235235
.is_err()
236236
{

libsqlx-server/src/allocation/replica.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,8 @@ use tokio::task::block_in_place;
1616
use tokio::time::{timeout, Sleep};
1717

1818
use crate::linc::bus::Dispatch;
19-
use crate::linc::proto::{BuilderStep, ProxyResponse};
20-
use crate::linc::proto::{Enveloppe, Frames, Message};
21-
use crate::linc::Inbound;
22-
use crate::linc::{NodeId, Outbound};
19+
use crate::linc::proto::{BuilderStep, Enveloppe, Frames, Message, ProxyResponse};
20+
use crate::linc::{Inbound, NodeId, Outbound};
2321
use crate::meta::DatabaseId;
2422

2523
use super::{ConnectionHandler, ConnectionMessage};
@@ -166,6 +164,7 @@ impl Replicator {
166164
});
167165
}
168166
}
167+
// no news from primary for the past 5 secs, send a request again
169168
Err(_) => self.query_replicate().await,
170169
Ok(None) => break,
171170
}

libsqlx-server/src/hrana/batch.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -110,12 +110,15 @@ pub async fn execute_sequence(conn: &ConnectionHandle, pgm: Program) -> color_ey
110110
let builder = StepResultsBuilder::new(snd);
111111
conn.execute(pgm, Box::new(builder)).await?;
112112

113-
rcv.await?.map_err(|e| anyhow!("{e}"))?.into_iter().try_for_each(|result| match result {
114-
StepResult::Ok => Ok(()),
115-
StepResult::Err(e) => match stmt_error_from_sqld_error(e) {
116-
Ok(stmt_err) => Err(anyhow!(stmt_err)),
117-
Err(sqld_err) => Err(anyhow!(sqld_err)),
118-
},
119-
StepResult::Skipped => Err(anyhow!("Statement in sequence was not executed")),
120-
})
113+
rcv.await?
114+
.map_err(|e| anyhow!("{e}"))?
115+
.into_iter()
116+
.try_for_each(|result| match result {
117+
StepResult::Ok => Ok(()),
118+
StepResult::Err(e) => match stmt_error_from_sqld_error(e) {
119+
Ok(stmt_err) => Err(anyhow!(stmt_err)),
120+
Err(sqld_err) => Err(anyhow!(sqld_err)),
121+
},
122+
StepResult::Skipped => Err(anyhow!("Statement in sequence was not executed")),
123+
})
121124
}

libsqlx-server/src/main.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use hyper::server::conn::AddrIncoming;
1212
use linc::bus::Bus;
1313
use manager::Manager;
1414
use meta::Store;
15+
use replica_commit_store::ReplicaCommitStore;
1516
use snapshot_store::SnapshotStore;
1617
use tokio::fs::create_dir_all;
1718
use tokio::net::{TcpListener, TcpStream};
@@ -28,6 +29,7 @@ mod http;
2829
mod linc;
2930
mod manager;
3031
mod meta;
32+
mod replica_commit_store;
3133
mod snapshot_store;
3234

3335
#[derive(Debug, Parser)]
@@ -123,11 +125,13 @@ async fn main() -> Result<()> {
123125
snapshot_store,
124126
)?);
125127
let store = Arc::new(Store::new(env.clone()));
128+
let replica_commit_store = Arc::new(ReplicaCommitStore::new(env.clone()));
126129
let manager = Arc::new(Manager::new(
127130
config.db_path.clone(),
128131
store.clone(),
129132
100,
130133
compaction_queue.clone(),
134+
replica_commit_store,
131135
));
132136
let bus = Arc::new(Bus::new(config.cluster.id, manager.clone()));
133137

libsqlx-server/src/manager.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@ use crate::linc::bus::Dispatch;
1414
use crate::linc::handler::Handler;
1515
use crate::linc::Inbound;
1616
use crate::meta::{DatabaseId, Store};
17+
use crate::replica_commit_store::ReplicaCommitStore;
1718

1819
pub struct Manager {
1920
cache: Cache<DatabaseId, mpsc::Sender<AllocationMessage>>,
2021
meta_store: Arc<Store>,
2122
db_path: PathBuf,
2223
compaction_queue: Arc<CompactionQueue>,
24+
replica_commit_store: Arc<ReplicaCommitStore>,
2325
}
2426

2527
const MAX_ALLOC_MESSAGE_QUEUE_LEN: usize = 32;
@@ -30,12 +32,14 @@ impl Manager {
3032
meta_store: Arc<Store>,
3133
max_conccurent_allocs: u64,
3234
compaction_queue: Arc<CompactionQueue>,
35+
replica_commit_store: Arc<ReplicaCommitStore>,
3336
) -> Self {
3437
Self {
3538
cache: Cache::new(max_conccurent_allocs),
3639
meta_store,
3740
db_path,
3841
compaction_queue,
42+
replica_commit_store,
3943
}
4044
}
4145

@@ -60,6 +64,7 @@ impl Manager {
6064
path,
6165
dispatcher.clone(),
6266
self.compaction_queue.clone(),
67+
self.replica_commit_store.clone(),
6368
),
6469
connections_futs: JoinSet::new(),
6570
next_conn_id: 0,

libsqlx-server/src/meta.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ use tokio::task::block_in_place;
1010

1111
use crate::allocation::config::AllocConfig;
1212

13-
type ExecFn = Box<dyn FnOnce(&mut libsqlx::libsql::LibsqlConnection<()>)>;
14-
1513
pub struct Store {
1614
env: heed::Env,
1715
alloc_config_db: heed::Database<OwnedType<DatabaseId>, SerdeBincode<AllocConfig>>,
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
use heed_types::OwnedType;
2+
use libsqlx::FrameNo;
3+
4+
use crate::meta::DatabaseId;
5+
6+
/// Stores replica last injected commit index
7+
pub struct ReplicaCommitStore {
8+
env: heed::Env,
9+
database: heed::Database<OwnedType<DatabaseId>, OwnedType<FrameNo>>,
10+
}
11+
12+
impl ReplicaCommitStore {
13+
const DB_NAME: &str = "replica-commit-store";
14+
pub fn new(env: heed::Env) -> Self {
15+
let mut txn = env.write_txn().unwrap();
16+
let database = env.create_database(&mut txn, Some(Self::DB_NAME)).unwrap();
17+
txn.commit().unwrap();
18+
19+
Self { env, database }
20+
}
21+
22+
pub fn commit(&self, database_id: DatabaseId, frame_no: FrameNo) {
23+
let mut txn = self.env.write_txn().unwrap();
24+
self.database
25+
.put(&mut txn, &database_id, &frame_no)
26+
.unwrap();
27+
txn.commit().unwrap();
28+
}
29+
30+
pub fn get_commit_index(&self, database_id: DatabaseId) -> Option<FrameNo> {
31+
let txn = self.env.read_txn().unwrap();
32+
self.database.get(&txn, &database_id).unwrap()
33+
}
34+
}

libsqlx/src/database/libsql/injector/hook.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::database::frame::FrameBorrowed;
99
use crate::database::libsql::replication_log::WAL_PAGE_SIZE;
1010

1111
use super::headers::Headers;
12-
use super::{FrameBuffer, InjectorCommitHandler};
12+
use super::{FrameBuffer, OnCommitCb};
1313

1414
// Those are custom error codes returned by the replicator hook.
1515
pub const LIBSQL_INJECT_FATAL: c_int = 200;
@@ -23,15 +23,15 @@ pub struct InjectorHookCtx {
2323
buffer: FrameBuffer,
2424
/// currently in a txn
2525
is_txn: bool,
26-
commit_handler: Box<dyn InjectorCommitHandler>,
26+
on_commit_cb: OnCommitCb,
2727
}
2828

2929
impl InjectorHookCtx {
30-
pub fn new(buffer: FrameBuffer, commit_handler: Box<dyn InjectorCommitHandler>) -> Self {
30+
pub fn new(buffer: FrameBuffer, commit_handler: OnCommitCb) -> Self {
3131
Self {
3232
buffer,
3333
is_txn: false,
34-
commit_handler,
34+
on_commit_cb: commit_handler,
3535
}
3636
}
3737

@@ -45,9 +45,6 @@ impl InjectorHookCtx {
4545
let buffer = self.buffer.lock();
4646
let (mut headers, last_frame_no, size_after) =
4747
make_page_header(buffer.iter().map(|f| &**f));
48-
if size_after != 0 {
49-
self.commit_handler.pre_commit(last_frame_no)?;
50-
}
5148

5249
let ret = unsafe {
5350
orig(
@@ -64,7 +61,7 @@ impl InjectorHookCtx {
6461
debug_assert!(headers.all_applied());
6562
drop(headers);
6663
if size_after != 0 {
67-
self.commit_handler.post_commit(last_frame_no)?;
64+
(self.on_commit_cb)(last_frame_no);
6865
self.is_txn = false;
6966
}
7067
tracing::trace!("applied frame batch");

0 commit comments

Comments
 (0)