@@ -97,6 +97,8 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
9797
9898 let mut last_op: Option < i64 > = None ;
9999 let mut add_checksum: i32 = 0 ;
100+ let mut op_checksum: i32 = 0 ;
101+ let mut remove_operations: i32 = 0 ;
100102
101103 while iterate_statement. step ( ) ? == ResultCode :: ROW {
102104 let op_id = iterate_statement. column_int64 ( 0 ) ?;
@@ -126,6 +128,7 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
126128 let superseded_op = supersede_statement. column_int64 ( 0 ) ?;
127129 let supersede_checksum = supersede_statement. column_int ( 1 ) ?;
128130 add_checksum = add_checksum. wrapping_add ( supersede_checksum) ;
131+ op_checksum = op_checksum. wrapping_sub ( supersede_checksum) ;
129132
130133 if superseded_op <= last_applied_op {
131134 // Superseded an operation previously applied - we cannot skip removes
@@ -172,6 +175,14 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
172175
173176 insert_statement. bind_int ( 8 , checksum) ?;
174177 insert_statement. exec ( ) ?;
178+
179+ op_checksum = op_checksum. wrapping_add ( checksum) ;
180+
181+ if opi == 4 {
182+ // We persisted a REMOVE statement, so the bucket needs
183+ // to be compacted at some point.
184+ remove_operations += 1 ;
185+ }
175186 } else if op == "MOVE" {
176187 add_checksum = add_checksum. wrapping_add ( checksum) ;
177188 } else if op == "CLEAR" {
@@ -185,14 +196,15 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
185196 // We also replace the checksum with the checksum of the CLEAR op.
186197 // language=SQLite
187198 let clear_statement2 = db. prepare_v2 (
188- "UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1 WHERE name = ?2" ,
199+ "UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1, op_checksum = 0 WHERE name = ?2" ,
189200 ) ?;
190201 clear_statement2. bind_text ( 2 , bucket, sqlite:: Destructor :: STATIC ) ?;
191202 clear_statement2. bind_int ( 1 , checksum) ?;
192203 clear_statement2. exec ( ) ?;
193204
194205 add_checksum = 0 ;
195206 last_applied_op = 0 ;
207+ op_checksum = 0 ;
196208 }
197209 }
198210
@@ -201,12 +213,16 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
201213 let statement = db. prepare_v2 (
202214 "UPDATE ps_buckets
203215 SET last_op = ?2,
204- add_checksum = add_checksum + ?3
216+ add_checksum = (add_checksum + ?3) & 0xffffffff,
217+ op_checksum = (op_checksum + ?4) & 0xffffffff,
218+ remove_operations = (remove_operations + ?5)
205219 WHERE name = ?1" ,
206220 ) ?;
207221 statement. bind_text ( 1 , bucket, sqlite:: Destructor :: STATIC ) ?;
208222 statement. bind_int64 ( 2 , * last_op) ?;
209223 statement. bind_int ( 3 , add_checksum) ?;
224+ statement. bind_int ( 4 , op_checksum) ?;
225+ statement. bind_int ( 5 , remove_operations) ?;
210226
211227 statement. exec ( ) ?;
212228 }
@@ -216,17 +232,34 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
216232
217233pub fn clear_remove_ops ( db : * mut sqlite:: sqlite3 , _data : & str ) -> Result < ( ) , SQLiteError > {
218234 // language=SQLite
219- let statement =
220- db. prepare_v2 ( "SELECT name, last_applied_op FROM ps_buckets WHERE pending_delete = 0" ) ?;
235+ let statement = db. prepare_v2 (
236+ "
237+ SELECT
238+ name,
239+ last_applied_op,
240+ (SELECT IFNULL(SUM(oplog.hash), 0)
241+ FROM ps_oplog oplog
242+ WHERE oplog.bucket = ps_buckets.name
243+ AND oplog.op_id <= ps_buckets.last_applied_op
244+ AND (oplog.superseded = 1 OR oplog.op != 3)
245+ ) as checksum
246+ FROM ps_buckets
247+ WHERE ps_buckets.pending_delete = 0 AND
248+ ps_buckets.remove_operations >= CASE
249+ WHEN ?1 = '' THEN 1
250+ ELSE IFNULL(?1 ->> 'threshold', 1)
251+ END" ,
252+ ) ?;
253+ // Compact bucket if there are 50 or more operations
254+ statement. bind_text ( 1 , _data, sqlite:: Destructor :: STATIC ) ;
221255
222256 // language=SQLite
223257 let update_statement = db. prepare_v2 (
224- "UPDATE ps_buckets
225- SET add_checksum = add_checksum + (SELECT IFNULL(SUM(hash), 0)
226- FROM ps_oplog AS oplog
227- WHERE (superseded = 1 OR op != 3)
228- AND oplog.bucket = ?1
229- AND oplog.op_id <= ?2)
258+ "
259+ UPDATE ps_buckets
260+ SET add_checksum = (add_checksum + ?2) & 0xffffffff,
261+ op_checksum = (op_checksum - ?2) & 0xffffffff,
262+ remove_operations = 0
230263 WHERE ps_buckets.name = ?1" ,
231264 ) ?;
232265
@@ -243,10 +276,10 @@ pub fn clear_remove_ops(db: *mut sqlite::sqlite3, _data: &str) -> Result<(), SQL
243276 // Note: Each iteration here may be run in a separate transaction.
244277 let name = statement. column_text ( 0 ) ?;
245278 let last_applied_op = statement. column_int64 ( 1 ) ?;
279+ let checksum = statement. column_int ( 2 ) ?;
246280
247281 update_statement. bind_text ( 1 , name, sqlite:: Destructor :: STATIC ) ?;
248- update_statement. bind_int64 ( 2 , last_applied_op) ?;
249-
282+ update_statement. bind_int ( 2 , checksum) ?;
250283 update_statement. exec ( ) ?;
251284
252285 // Must use the same values as above
0 commit comments