From 6971baac8e04bdaed0d40695efcd483ca1e947aa Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 17 Oct 2024 14:00:02 +0200 Subject: [PATCH 1/4] Add failing test. --- packages/sqlite_async/test/watch_test.dart | 41 ++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/packages/sqlite_async/test/watch_test.dart b/packages/sqlite_async/test/watch_test.dart index e0ef765..b179009 100644 --- a/packages/sqlite_async/test/watch_test.dart +++ b/packages/sqlite_async/test/watch_test.dart @@ -253,5 +253,46 @@ void main() { done = true; } }); + + test('watch with transaction', () async { + final db = await testUtils.setupDatabase(path: path); + await createTables(db); + + const baseTime = 20; + + const throttleDuration = Duration(milliseconds: baseTime); + // delay must be bigger than throttleDuration, and bigger + // than any internal throttles. + const delay = Duration(milliseconds: baseTime * 3); + + final stream = db.watch('SELECT count() AS count FROM assets', + throttle: throttleDuration); + + List counts = []; + + final subscription = stream.listen((e) { + counts.add(e.first['count']); + }); + await Future.delayed(delay); + + await db.writeTransaction((tx) async { + await tx.execute('INSERT INTO assets(make) VALUES (?)', ['test1']); + await Future.delayed(delay); + await tx.execute('INSERT INTO assets(make) VALUES (?)', ['test2']); + await Future.delayed(delay); + }); + await Future.delayed(delay); + + subscription.cancel(); + + expect( + counts, + equals([ + // one event when starting the subscription + 0, + // one event after the transaction + 2 + ])); + }); }); } From 64b349eef52788c57a60f7c7ab605fbb14766781 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 17 Oct 2024 14:35:31 +0200 Subject: [PATCH 2/4] Fix update notifications that should not trigger inside transactions. --- .../database/native_sqlite_connection_impl.dart | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart index b7ef76b..299339c 100644 --- a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart +++ b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart @@ -286,7 +286,17 @@ Future _sqliteConnectionIsolateInner(_SqliteConnectionParams params, Object? txError; void maybeFireUpdates() { - if (updatedTables.isNotEmpty) { + // We keep buffering the set of updated tables until we are not + // in a transaction. Firing transactions inside a transaction + // has multiple issues: + // 1. Watched queries would detect changes to the underlying tables, + // but the data would not be visible to queries yet. + // 2. It would trigger many more notifications than required. + // + // This still includes updates for transactions that are rolled back. + // We could handle those better at a later stage. + + if (updatedTables.isNotEmpty && db.autocommit) { client.fire(UpdateNotification(updatedTables)); updatedTables.clear(); updateDebouncer?.cancel(); @@ -301,7 +311,7 @@ Future _sqliteConnectionIsolateInner(_SqliteConnectionParams params, // 1. Update arrived after _SqliteIsolateClose (not sure if this could happen). // 2. Long-running _SqliteIsolateClosure that should fire updates while running. updateDebouncer ??= - Timer(const Duration(milliseconds: 10), maybeFireUpdates); + Timer(const Duration(milliseconds: 1), maybeFireUpdates); }); server.open((data) async { From 2178d69d896ced8c16bdae055a224e7bac9c7644 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 17 Oct 2024 16:07:24 +0200 Subject: [PATCH 3/4] Throttle update notifications for web implementation. --- .../native_sqlite_connection_impl.dart | 5 +- .../web/worker/throttled_common_database.dart | 197 ++++++++++++++++++ .../lib/src/web/worker/worker_utils.dart | 5 +- packages/sqlite_async/test/watch_test.dart | 9 +- 4 files changed, 212 insertions(+), 4 deletions(-) create mode 100644 packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart diff --git a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart index 299339c..7df4ac8 100644 --- a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart +++ b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart @@ -299,9 +299,9 @@ Future _sqliteConnectionIsolateInner(_SqliteConnectionParams params, if (updatedTables.isNotEmpty && db.autocommit) { client.fire(UpdateNotification(updatedTables)); updatedTables.clear(); - updateDebouncer?.cancel(); - updateDebouncer = null; } + updateDebouncer?.cancel(); + updateDebouncer = null; } db.updates.listen((event) { @@ -316,6 +316,7 @@ Future _sqliteConnectionIsolateInner(_SqliteConnectionParams params, server.open((data) async { if (data is _SqliteIsolateClose) { + // This is a transaction close message if (txId != null) { if (!db.autocommit) { db.execute('ROLLBACK'); diff --git a/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart b/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart new file mode 100644 index 0000000..da69d12 --- /dev/null +++ b/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart @@ -0,0 +1,197 @@ +import 'dart:async'; + +import 'package:sqlite_async/sqlite3_common.dart'; + +/// Wrap a CommonDatabase to throttle its updates stream. +/// This is so that we can throttle the updates _within_ +/// the worker process, avoiding mass notifications over +/// the MessagePort. +class ThrottledCommonDatabase extends CommonDatabase { + final CommonDatabase _db; + final StreamController _transactionController = + StreamController.broadcast(); + + ThrottledCommonDatabase(this._db); + + @override + int get userVersion => _db.userVersion; + + @override + set userVersion(int userVersion) { + _db.userVersion = userVersion; + } + + @override + bool get autocommit => _db.autocommit; + + @override + DatabaseConfig get config => _db.config; + + @override + void createAggregateFunction( + {required String functionName, + required AggregateFunction function, + AllowedArgumentCount argumentCount = const AllowedArgumentCount.any(), + bool deterministic = false, + bool directOnly = true}) { + _db.createAggregateFunction(functionName: functionName, function: function); + } + + @override + void createCollation( + {required String name, required CollatingFunction function}) { + _db.createCollation(name: name, function: function); + } + + @override + void createFunction( + {required String functionName, + required ScalarFunction function, + AllowedArgumentCount argumentCount = const AllowedArgumentCount.any(), + bool deterministic = false, + bool directOnly = true}) { + _db.createFunction(functionName: functionName, function: function); + } + + @override + void dispose() { + _db.dispose(); + } + + @override + void execute(String sql, [List parameters = const []]) { + _db.execute(sql, parameters); + } + + @override + int getUpdatedRows() { + // ignore: deprecated_member_use + return _db.getUpdatedRows(); + } + + @override + int get lastInsertRowId => _db.lastInsertRowId; + + @override + CommonPreparedStatement prepare(String sql, + {bool persistent = false, bool vtab = true, bool checkNoTail = false}) { + return _db.prepare(sql, + persistent: persistent, vtab: vtab, checkNoTail: checkNoTail); + } + + @override + List prepareMultiple(String sql, + {bool persistent = false, bool vtab = true}) { + return _db.prepareMultiple(sql, persistent: persistent, vtab: vtab); + } + + @override + ResultSet select(String sql, [List parameters = const []]) { + bool preAutocommit = _db.autocommit; + final result = _db.select(sql, parameters); + bool postAutocommit = _db.autocommit; + if (!preAutocommit && postAutocommit) { + _transactionController.add(true); + } + return result; + } + + @override + int get updatedRows => _db.updatedRows; + + @override + Stream get updates { + return throttledUpdates(_db, _transactionController.stream); + } +} + +/// This throttles the database update stream to: +/// 1. Trigger max once every 1ms. +/// 2. Only trigger _after_ transactions. +Stream throttledUpdates( + CommonDatabase source, Stream transactionStream) { + StreamController? controller; + Set insertedTables = {}; + Set updatedTables = {}; + Set deletedTables = {}; + var paused = false; + + Timer? updateDebouncer; + + void maybeFireUpdates() { + updateDebouncer?.cancel(); + updateDebouncer = null; + + if (paused) { + // Continue collecting updates, but don't fire any + return; + } + + if (!source.autocommit) { + // Inside a transaction - do not fire updates + return; + } + + if (updatedTables.isNotEmpty) { + for (var tableName in updatedTables) { + controller!.add(SqliteUpdate(SqliteUpdateKind.update, tableName, 0)); + } + + updatedTables.clear(); + } + + if (insertedTables.isNotEmpty) { + for (var tableName in insertedTables) { + controller!.add(SqliteUpdate(SqliteUpdateKind.insert, tableName, 0)); + } + + insertedTables.clear(); + } + + if (deletedTables.isNotEmpty) { + for (var tableName in deletedTables) { + controller!.add(SqliteUpdate(SqliteUpdateKind.delete, tableName, 0)); + } + + deletedTables.clear(); + } + } + + void collectUpdate(SqliteUpdate event) { + if (event.kind == SqliteUpdateKind.insert) { + insertedTables.add(event.tableName); + } else if (event.kind == SqliteUpdateKind.update) { + updatedTables.add(event.tableName); + } else if (event.kind == SqliteUpdateKind.delete) { + deletedTables.add(event.tableName); + } + + updateDebouncer ??= + Timer(const Duration(milliseconds: 1), maybeFireUpdates); + } + + StreamSubscription? txSubscription; + StreamSubscription? sourceSubscription; + + controller = StreamController(onListen: () { + txSubscription = transactionStream.listen((event) { + maybeFireUpdates(); + }, onError: (error) { + controller?.addError(error); + }); + + sourceSubscription = source.updates.listen(collectUpdate, onError: (error) { + controller?.addError(error); + }); + }, onPause: () { + paused = true; + }, onResume: () { + paused = false; + maybeFireUpdates(); + }, onCancel: () { + txSubscription?.cancel(); + sourceSubscription?.cancel(); + }); + + return controller.stream; +} diff --git a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart index b4657dd..1d8fb5c 100644 --- a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart +++ b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart @@ -4,6 +4,7 @@ import 'dart:js_util' as js_util; import 'package:mutex/mutex.dart'; import 'package:sqlite3/wasm.dart'; import 'package:sqlite3_web/sqlite3_web.dart'; +import 'throttled_common_database.dart'; import '../protocol.dart'; @@ -18,7 +19,9 @@ base class AsyncSqliteController extends DatabaseController { // Register any custom functions here if needed - return AsyncSqliteDatabase(database: db); + final throttled = ThrottledCommonDatabase(db); + + return AsyncSqliteDatabase(database: throttled); } @override diff --git a/packages/sqlite_async/test/watch_test.dart b/packages/sqlite_async/test/watch_test.dart index b179009..08a80cb 100644 --- a/packages/sqlite_async/test/watch_test.dart +++ b/packages/sqlite_async/test/watch_test.dart @@ -258,7 +258,7 @@ void main() { final db = await testUtils.setupDatabase(path: path); await createTables(db); - const baseTime = 20; + const baseTime = 10; const throttleDuration = Duration(milliseconds: baseTime); // delay must be bigger than throttleDuration, and bigger @@ -293,6 +293,13 @@ void main() { // one event after the transaction 2 ])); + + // Other observed results (failure scenarios): + // [0, 0, 0]: The watch is triggered during the transaction + // and executes concurrently with the transaction. + // [0, 2, 2]: The watch is triggered during the transaction, + // but executes after the transaction (single connection). + // [0]: No updates triggered. }); }); } From deaaba52c1f30c91fc67129dc513b4278f5fbe58 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 17 Oct 2024 16:17:53 +0200 Subject: [PATCH 4/4] Simplify update throttling. --- .../web/worker/throttled_common_database.dart | 38 ++++--------------- 1 file changed, 8 insertions(+), 30 deletions(-) diff --git a/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart b/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart index da69d12..ea73bd6 100644 --- a/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart +++ b/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart @@ -111,9 +111,7 @@ class ThrottledCommonDatabase extends CommonDatabase { Stream throttledUpdates( CommonDatabase source, Stream transactionStream) { StreamController? controller; - Set insertedTables = {}; - Set updatedTables = {}; - Set deletedTables = {}; + Set pendingUpdates = {}; var paused = false; Timer? updateDebouncer; @@ -132,39 +130,19 @@ Stream throttledUpdates( return; } - if (updatedTables.isNotEmpty) { - for (var tableName in updatedTables) { - controller!.add(SqliteUpdate(SqliteUpdateKind.update, tableName, 0)); + if (pendingUpdates.isNotEmpty) { + for (var update in pendingUpdates) { + controller!.add(update); } - updatedTables.clear(); - } - - if (insertedTables.isNotEmpty) { - for (var tableName in insertedTables) { - controller!.add(SqliteUpdate(SqliteUpdateKind.insert, tableName, 0)); - } - - insertedTables.clear(); - } - - if (deletedTables.isNotEmpty) { - for (var tableName in deletedTables) { - controller!.add(SqliteUpdate(SqliteUpdateKind.delete, tableName, 0)); - } - - deletedTables.clear(); + pendingUpdates.clear(); } } void collectUpdate(SqliteUpdate event) { - if (event.kind == SqliteUpdateKind.insert) { - insertedTables.add(event.tableName); - } else if (event.kind == SqliteUpdateKind.update) { - updatedTables.add(event.tableName); - } else if (event.kind == SqliteUpdateKind.delete) { - deletedTables.add(event.tableName); - } + // We merge updates with the same kind and tableName. + // rowId is never used in sqlite_async. + pendingUpdates.add(SqliteUpdate(event.kind, event.tableName, 0)); updateDebouncer ??= Timer(const Duration(milliseconds: 1), maybeFireUpdates);