From a1deb8e5c3bcda256c59c020799d59fc3db997f1 Mon Sep 17 00:00:00 2001 From: Dominic Jack Date: Mon, 25 Nov 2024 14:06:53 +1000 Subject: [PATCH 01/11] Added commit / rollback hooks --- sqlite3/assets/sqlite3.h | 2 + sqlite3/assets/wasm/bridge.h | 10 ++- sqlite3/assets/wasm/helpers.c | 8 ++ sqlite3/lib/src/database.dart | 29 ++++++ sqlite3/lib/src/ffi/bindings.dart | 56 ++++++++++++ sqlite3/lib/src/ffi/sqlite3.g.dart | 54 ++++++++++++ sqlite3/lib/src/functions.dart | 3 + sqlite3/lib/src/implementation/bindings.dart | 6 ++ sqlite3/lib/src/implementation/database.dart | 58 ++++++++++++ sqlite3/lib/src/wasm/bindings.dart | 14 +++ sqlite3/lib/src/wasm/wasm_interop.dart | 22 ++++- sqlite3/test/common/database.dart | 92 ++++++++++++++++++++ sqlite3_web/lib/src/client.dart | 16 ++++ sqlite3_web/lib/src/database.dart | 6 ++ sqlite3_web/lib/src/protocol.dart | 54 ++++++++++++ sqlite3_web/lib/src/worker.dart | 18 ++++ 16 files changed, 443 insertions(+), 5 deletions(-) diff --git a/sqlite3/assets/sqlite3.h b/sqlite3/assets/sqlite3.h index e457ca22..82dfdd99 100644 --- a/sqlite3/assets/sqlite3.h +++ b/sqlite3/assets/sqlite3.h @@ -35,6 +35,8 @@ void *sqlite3_update_hook(sqlite3 *, void (*)(void *, int, sqlite3_char const *, sqlite3_char const *, int64_t), void *); +void *sqlite3_commit_hook(sqlite3*, int(*)(void *), void*); +void *sqlite3_rollback_hook(sqlite3*, void(*)(void *), void*); int sqlite3_get_autocommit(sqlite3 *db); // Statements diff --git a/sqlite3/assets/wasm/bridge.h b/sqlite3/assets/wasm/bridge.h index 06bc9c46..23401b97 100644 --- a/sqlite3/assets/wasm/bridge.h +++ b/sqlite3/assets/wasm/bridge.h @@ -50,10 +50,12 @@ import_dart("function_xInverse") extern void dartXInverse( import_dart("function_xFinal") extern void dartXFinal(sqlite3_context *ctx); import_dart("function_xValue") extern void dartXValue(sqlite3_context *ctx); import_dart("function_forget") extern void dartForgetAboutFunction(void *ptr); -import_dart("function_hook") extern void dartUpdateHook(void *id, int kind, - const char *db, - const char *table, - sqlite3_int64 rowid); +import_dart("function_update_hook") extern void dartUpdateHook(void *id, int kind, + const char *db, + const char *table, + sqlite3_int64 rowid); +import_dart("function_commit_hook") extern int dartCommitHook(void *id); +import_dart("function_rollback_hook") extern void dartRollbackHook(void *id); import_dart("function_compare") extern int dartXCompare(void *id, int lengthA, const void *a, int lengthB, diff --git a/sqlite3/assets/wasm/helpers.c b/sqlite3/assets/wasm/helpers.c index 380104ca..e3a4f004 100644 --- a/sqlite3/assets/wasm/helpers.c +++ b/sqlite3/assets/wasm/helpers.c @@ -223,6 +223,14 @@ SQLITE_API void dart_sqlite3_updates(sqlite3 *db, int id) { sqlite3_update_hook(db, id >= 0 ? &dartUpdateHook : NULL, (void *)id); } +SQLITE_API void dart_sqlite3_commits(sqlite3 *db, int id) { + sqlite3_commit_hook(db, id >= 0 ? &dartCommitHook : NULL, (void *)id); +} + +SQLITE_API void dart_sqlite3_rollbacks(sqlite3 *db, int id) { + sqlite3_rollback_hook(db, id >= 0 ? &dartRollbackHook : NULL, (void *)id); +} + SQLITE_API int dart_sqlite3_create_collation(sqlite3 *db, const char *zName, int eTextRep, int id) { return sqlite3_create_collation_v2(db, zName, eTextRep, (void *)id, diff --git a/sqlite3/lib/src/database.dart b/sqlite3/lib/src/database.dart index 0bd2214f..720ecbc3 100644 --- a/sqlite3/lib/src/database.dart +++ b/sqlite3/lib/src/database.dart @@ -51,6 +51,35 @@ abstract class CommonDatabase { /// - [Data Change Notification Callbacks](https://www.sqlite.org/c3ref/update_hook.html) Stream get updates; + /// The [VoidPredicate] that is used to filter out transactions before commiting. + /// + /// This is run before every commit, i.e. before the end of an explicit + /// transaction and before the end of an implicit transactions created by + /// an insert / update / delete operation. + /// + /// If the filter returns `false`, the commit is converted into a rollback. + /// + /// The function should not do anything that modifies the database connection, + /// e.g. run SQL statements, prepare statements or step. + /// + /// See also: + /// - [Commit Hooks](https://www.sqlite.org/c3ref/commit_hook.html) + VoidPredicate? get commitFilter; + set commitFilter(VoidPredicate? commitFilter); + + /// An async stream that fires after each rollback. + /// + /// Listening to this stream will register an "update hook" on the native + /// database. Each rollback that sqlite3 reports through that hook will then + /// be added to the stream. + /// + /// Note that the stream reports updates _asynchronously_, e.g. one event + /// loop iteration after sqlite reports them. + /// + /// See also: + /// - [Commit Hooks](https://www.sqlite.org/c3ref/commit_hook.html) + Stream get rollbacks; + /// Executes the [sql] statement with the provided [parameters], ignoring any /// rows returned by the statement. /// diff --git a/sqlite3/lib/src/ffi/bindings.dart b/sqlite3/lib/src/ffi/bindings.dart index 4487b141..74e88c76 100644 --- a/sqlite3/lib/src/ffi/bindings.dart +++ b/sqlite3/lib/src/ffi/bindings.dart @@ -409,6 +409,8 @@ final class FfiDatabase extends RawSqliteDatabase { final BindingsWithLibrary bindings; final Pointer db; NativeCallable<_UpdateHook>? _installedUpdateHook; + NativeCallable<_CommitHook>? _installedCommitHook; + NativeCallable<_RollbackHook>? _installedRollbackHook; FfiDatabase(this.bindings, this.db); @@ -560,6 +562,37 @@ final class FfiDatabase extends RawSqliteDatabase { previous?.close(); } + @override + void sqlite3_commit_hook(RawCommitHook? hook) { + final previous = _installedCommitHook; + + if (hook == null) { + _installedCommitHook = null; + bindings.bindings.sqlite3_commit_hook(db, nullPtr(), nullPtr()); + } else { + final native = _installedCommitHook = hook.toNative(); + bindings.bindings + .sqlite3_commit_hook(db, native.nativeFunction, nullPtr()); + } + + previous?.close(); + } + + @override + void sqlite3_rollback_hook(RawRollbackHook? hook) { + final previous = _installedRollbackHook; + + if (hook == null) { + bindings.bindings.sqlite3_rollback_hook(db, nullPtr(), nullPtr()); + } else { + final native = _installedRollbackHook = hook.toNative(); + bindings.bindings + .sqlite3_rollback_hook(db, native.nativeFunction, nullPtr()); + } + + previous?.close(); + } + @override int sqlite3_db_config(int op, int value) { final result = bindings.bindings.sqlite3_db_config( @@ -970,6 +1003,8 @@ typedef _XCompare = Int Function( Pointer, Int, Pointer, Int, Pointer); typedef _UpdateHook = Void Function( Pointer, Int, Pointer, Pointer, Int64); +typedef _CommitHook = Int Function(Pointer); +typedef _RollbackHook = Void Function(Pointer); extension on RawXFunc { NativeCallable<_XFunc> toNative(Bindings bindings) { @@ -1023,3 +1058,24 @@ extension on RawUpdateHook { )..keepIsolateAlive = false; } } + +extension on RawCommitHook { + NativeCallable<_CommitHook> toNative() { + return NativeCallable.isolateLocal( + (Pointer _) { + return this(); + }, + exceptionalReturn: 1, + )..keepIsolateAlive = false; + } +} + +extension on RawRollbackHook { + NativeCallable<_RollbackHook> toNative() { + return NativeCallable.isolateLocal( + (Pointer _) { + this(); + }, + )..keepIsolateAlive = false; + } +} diff --git a/sqlite3/lib/src/ffi/sqlite3.g.dart b/sqlite3/lib/src/ffi/sqlite3.g.dart index 288f485e..ed1c8336 100644 --- a/sqlite3/lib/src/ffi/sqlite3.g.dart +++ b/sqlite3/lib/src/ffi/sqlite3.g.dart @@ -308,6 +308,60 @@ class Bindings { ffi.Int64)>>, ffi.Pointer)>(); + ffi.Pointer sqlite3_commit_hook( + ffi.Pointer arg0, + ffi.Pointer)>> + arg1, + ffi.Pointer arg2, + ) { + return _sqlite3_commit_hook( + arg0, + arg1, + arg2, + ); + } + + late final _sqlite3_commit_hookPtr = _lookup< + ffi.NativeFunction< + ffi.Pointer Function( + ffi.Pointer, + ffi.Pointer< + ffi.NativeFunction)>>, + ffi.Pointer)>>('sqlite3_commit_hook'); + late final _sqlite3_commit_hook = _sqlite3_commit_hookPtr.asFunction< + ffi.Pointer Function( + ffi.Pointer, + ffi.Pointer< + ffi.NativeFunction)>>, + ffi.Pointer)>(); + + ffi.Pointer sqlite3_rollback_hook( + ffi.Pointer arg0, + ffi.Pointer)>> + arg1, + ffi.Pointer arg2, + ) { + return _sqlite3_rollback_hook( + arg0, + arg1, + arg2, + ); + } + + late final _sqlite3_rollback_hookPtr = _lookup< + ffi.NativeFunction< + ffi.Pointer Function( + ffi.Pointer, + ffi.Pointer< + ffi.NativeFunction)>>, + ffi.Pointer)>>('sqlite3_rollback_hook'); + late final _sqlite3_rollback_hook = _sqlite3_rollback_hookPtr.asFunction< + ffi.Pointer Function( + ffi.Pointer, + ffi.Pointer< + ffi.NativeFunction)>>, + ffi.Pointer)>(); + int sqlite3_get_autocommit( ffi.Pointer db, ) { diff --git a/sqlite3/lib/src/functions.dart b/sqlite3/lib/src/functions.dart index 12a82cce..57450d2c 100644 --- a/sqlite3/lib/src/functions.dart +++ b/sqlite3/lib/src/functions.dart @@ -1,5 +1,8 @@ import 'package:meta/meta.dart'; +/// A filter function without any arguments. +typedef VoidPredicate = bool Function(); + /// A collating function provided to a sql collation. /// /// The function must return a `int`. diff --git a/sqlite3/lib/src/implementation/bindings.dart b/sqlite3/lib/src/implementation/bindings.dart index 02cf6be3..ebda1ba2 100644 --- a/sqlite3/lib/src/implementation/bindings.dart +++ b/sqlite3/lib/src/implementation/bindings.dart @@ -57,6 +57,8 @@ typedef RawXFunc = void Function(RawSqliteContext, List); typedef RawXStep = void Function(RawSqliteContext, List); typedef RawXFinal = void Function(RawSqliteContext); typedef RawUpdateHook = void Function(int kind, String tableName, int rowId); +typedef RawCommitHook = int Function(); +typedef RawRollbackHook = void Function(); typedef RawCollation = int Function(String? a, String? b); abstract base class RawSqliteDatabase { @@ -79,6 +81,10 @@ abstract base class RawSqliteDatabase { void sqlite3_update_hook(RawUpdateHook? hook); + void sqlite3_commit_hook(RawCommitHook? hook); + + void sqlite3_rollback_hook(RawRollbackHook? hook); + /// Returns a compiler able to create prepared statements from the utf8- /// encoded SQL string passed as its argument. RawStatementCompiler newCompiler(List utf8EncodedSql); diff --git a/sqlite3/lib/src/implementation/database.dart b/sqlite3/lib/src/implementation/database.dart index 4ead4300..497793c7 100644 --- a/sqlite3/lib/src/implementation/database.dart +++ b/sqlite3/lib/src/implementation/database.dart @@ -58,6 +58,9 @@ base class DatabaseImplementation implements CommonDatabase { final FinalizableDatabase finalizable; final List> _updateListeners = []; + final List> _rollbackListeners = []; + + VoidPredicate? _commitFilter; var _isClosed = false; @@ -228,6 +231,8 @@ base class DatabaseImplementation implements CommonDatabase { listener.close(); } database.sqlite3_update_hook(null); + database.sqlite3_commit_hook(null); + database.sqlite3_rollback_hook(null); finalizable.dispose(); } @@ -459,6 +464,59 @@ base class DatabaseImplementation implements CommonDatabase { isBroadcast: true, ); } + + @override + Stream get rollbacks { + return Stream.multi( + (newListener) { + if (_isClosed) { + newListener.closeSync(); + return; + } + + void addRollbackListener() { + final isFirstListener = _rollbackListeners.isEmpty; + _rollbackListeners.add(newListener); + + if (isFirstListener) { + // Add native rollback hook + database.sqlite3_rollback_hook(() { + for (final listener in _rollbackListeners) { + listener.add(null); + } + }); + } + } + + void removeRollbackListener() { + _rollbackListeners.remove(newListener); + + if (_rollbackListeners.isEmpty && !_isClosed) { + database.sqlite3_rollback_hook(null); // Remove native hook + } + } + + newListener + ..onPause = removeRollbackListener + ..onCancel = removeRollbackListener + ..onResume = addRollbackListener; + + // Since this is a onListen callback, add listener now + addRollbackListener(); + }, + isBroadcast: true, + ); + } + + @override + VoidPredicate? get commitFilter => _commitFilter; + + @override + set commitFilter(VoidPredicate? commitFilter) { + _commitFilter = commitFilter; + database.sqlite3_commit_hook( + commitFilter == null ? null : () => commitFilter() ? 0 : 1); + } } extension on RawSqliteContext { diff --git a/sqlite3/lib/src/wasm/bindings.dart b/sqlite3/lib/src/wasm/bindings.dart index f6245d2d..e1852dd0 100644 --- a/sqlite3/lib/src/wasm/bindings.dart +++ b/sqlite3/lib/src/wasm/bindings.dart @@ -248,6 +248,20 @@ final class WasmDatabase extends RawSqliteDatabase { bindings.dart_sqlite3_updates(db, hook != null ? 1 : -1); } + @override + void sqlite3_commit_hook(RawCommitHook? hook) { + bindings.callbacks.installedCommitHook = hook; + + bindings.dart_sqlite3_commits(db, hook != null ? 1 : -1); + } + + @override + void sqlite3_rollback_hook(RawRollbackHook? hook) { + bindings.callbacks.installedRollbackHook = hook; + + bindings.dart_sqlite3_rollbacks(db, hook != null ? 1 : -1); + } + @override int sqlite3_get_autocommit() { return bindings.sqlite3_get_autocommit(db); diff --git a/sqlite3/lib/src/wasm/wasm_interop.dart b/sqlite3/lib/src/wasm/wasm_interop.dart index 75a1ca94..40627001 100644 --- a/sqlite3/lib/src/wasm/wasm_interop.dart +++ b/sqlite3/lib/src/wasm/wasm_interop.dart @@ -33,6 +33,8 @@ class WasmBindings { _register_vfs, _unregister_vfs, _update_hooks, + _commit_hooks, + _rollback_hooks, _sqlite3_libversion, _sqlite3_sourceid, _sqlite3_libversion_number, @@ -103,6 +105,8 @@ class WasmBindings { _register_vfs = instance.functions['dart_sqlite3_register_vfs']!, _unregister_vfs = instance.functions['sqlite3_vfs_unregister']!, _update_hooks = instance.functions['dart_sqlite3_updates']!, + _commit_hooks = instance.functions['dart_sqlite3_commits']!, + _rollback_hooks = instance.functions['dart_sqlite3_rollbacks']!, _sqlite3_libversion = instance.functions['sqlite3_libversion']!, _sqlite3_sourceid = instance.functions['sqlite3_sourceid']!, _sqlite3_libversion_number = @@ -279,6 +283,14 @@ class WasmBindings { _update_hooks.callReturningVoid2(db.toJS, id.toJS); } + int dart_sqlite3_commits(Pointer db, int id) { + return _commit_hooks.callReturningInt2(db.toJS, id.toJS); + } + + void dart_sqlite3_rollbacks(Pointer db, int id) { + return _rollback_hooks.callReturningVoid2(db.toJS, id.toJS); + } + int sqlite3_exec(Pointer db, Pointer sql, Pointer callback, Pointer callbackArg, Pointer errorOut) { return _sqlite3_exec.callReturningInt5( @@ -719,13 +731,19 @@ class _InjectedValues { return callbacks.functions[ctx]!.collation!(aStr, bStr); }).toJS, - 'function_hook': + 'function_update_hook': ((int id, int kind, Pointer _, Pointer table, JSBigInt rowId) { final tableName = memory.readString(table); callbacks.installedUpdateHook ?.call(kind, tableName, JsBigInt(rowId).asDartInt); }).toJS, + 'function_commit_hook': ((int id) { + return callbacks.installedCommitHook?.call(); + }).toJS, + 'function_rollback_hook': ((int id) { + callbacks.installedRollbackHook?.call(); + }).toJS, } }; } @@ -742,6 +760,8 @@ class DartCallbacks { final Map openedFiles = {}; RawUpdateHook? installedUpdateHook; + RawCommitHook? installedCommitHook; + RawRollbackHook? installedRollbackHook; int register(RegisteredFunctionSet set) { final id = _id++; diff --git a/sqlite3/test/common/database.dart b/sqlite3/test/common/database.dart index 1b314a42..388ac16e 100644 --- a/sqlite3/test/common/database.dart +++ b/sqlite3/test/common/database.dart @@ -707,6 +707,98 @@ void testDatabase( }); }); + group('rollback stream', () { + setUp(() { + database.execute('CREATE TABLE tbl (a TEXT, b INT);'); + }); + + test('emits on rollback', () { + expect(database.rollbacks, emits(isA())); + + database.execute('BEGIN TRANSACTION;'); + database.execute("ROLLBACK;"); + }); + + test('emits on rollback after insert', () { + expect(database.rollbacks, emits(isA())); + + database.execute('BEGIN TRANSACTION;'); + database.execute("INSERT INTO tbl VALUES ('', 1);"); + database.execute("ROLLBACK;"); + }); + + test('emits on rollback after erroneous SQL', () { + expect(database.rollbacks, emits(isA())); + + database.execute('BEGIN TRANSACTION;'); + try { + database.execute('Erroneous SQL'); + } catch (_) { + // ignore + } + database.execute("ROLLBACK;"); + }); + }); + + group('commit filter', () { + setUp(() { + database.execute('CREATE TABLE tbl (a TEXT, b INT);'); + }); + + test('explicit commits with always fails filter raises exception', () { + database.commitFilter = () => false; + expect(() { + database.execute('BEGIN TRANSACTION;'); + database.execute("INSERT INTO tbl VALUES ('', 1);"); + database.execute("COMMIT;"); + }, + throwsA(predicate((e) => + e.operation == 'executing' && + e.message.startsWith('constraint failed')))); + }); + + test('implicit commits with always fails filter raises exception', () { + database.commitFilter = () => false; + expect( + () => database.execute("INSERT INTO tbl VALUES ('', 1);"), + throwsA(predicate((e) => + e.operation == 'executing' && + e.message.startsWith('constraint failed')))); + }); + + test('side effects run on explicit commit', () { + var sideEffects = 0; + database.commitFilter = () { + ++sideEffects; + return true; + }; + + database.execute('BEGIN TRANSACTION;'); + database.execute("INSERT INTO tbl VALUES ('', 1);"); + database.execute("COMMIT;"); + // ensure the transaction committed correctly + expect(database.select('SELECT COUNT(*) AS c FROM tbl;').first['c'], + equals(1)); + // ensure side-effects ran + expect(sideEffects, equals(1)); + }); + + test('side effects run on implicit commit', () { + var sideEffects = 0; + database.commitFilter = () { + ++sideEffects; + return true; + }; + + database.execute("INSERT INTO tbl VALUES ('', 1);"); + // ensure the transaction committed correctly + expect(database.select('SELECT COUNT(*) AS c FROM tbl;').first['c'], + equals(1)); + // ensure side-effects ran + expect(sideEffects, equals(1)); + }); + }); + group('unicode handling', () { test('accents in statements', () { final table = 'télé'; // with accent diff --git a/sqlite3_web/lib/src/client.dart b/sqlite3_web/lib/src/client.dart index c21f0dd8..f9a10b95 100644 --- a/sqlite3_web/lib/src/client.dart +++ b/sqlite3_web/lib/src/client.dart @@ -21,6 +21,7 @@ final class RemoteDatabase implements Database { StreamSubscription? _notificationSubscription; final StreamController _updates = StreamController.broadcast(); + final StreamController _rollbacks = StreamController.broadcast(); RemoteDatabase({required this.connection, required this.databaseId}) { _updates @@ -35,12 +36,14 @@ final class RemoteDatabase implements Database { }); _requestUpdates(true); + _requestRollbacks(true); }) ..onCancel = (() { _notificationSubscription?.cancel(); _notificationSubscription = null; _requestUpdates(false); + _requestRollbacks(false); }); } @@ -54,6 +57,16 @@ final class RemoteDatabase implements Database { } } + void _requestRollbacks(bool sendRollbacks) { + if (!_isClosed) { + connection.sendRequest( + RollbackStreamRequest( + action: sendRollbacks, requestId: 1, databaseId: databaseId), + MessageType.simpleSuccessResponse, + ); + } + } + @override Future get closed { return connection.closed; @@ -126,6 +139,9 @@ final class RemoteDatabase implements Database { @override Stream get updates => _updates.stream; + @override + Stream get rollbacks => _rollbacks.stream; + @override Future get userVersion async { final result = await select('pragma user_version;'); diff --git a/sqlite3_web/lib/src/database.dart b/sqlite3_web/lib/src/database.dart index 15e78890..722cd097 100644 --- a/sqlite3_web/lib/src/database.dart +++ b/sqlite3_web/lib/src/database.dart @@ -50,6 +50,12 @@ abstract class Database { /// stream is active. Stream get updates; + /// A relayed stream of events triggered by rollbacks from the remote worker. + /// + /// Updates are only sent across worker channels while a subscription to this + /// stream is active. + Stream get rollbacks; + /// A future that resolves when the database is closed. /// /// Typically, databases are closed because [dispose] is called. For databases diff --git a/sqlite3_web/lib/src/protocol.dart b/sqlite3_web/lib/src/protocol.dart index 90f58310..044a7324 100644 --- a/sqlite3_web/lib/src/protocol.dart +++ b/sqlite3_web/lib/src/protocol.dart @@ -26,6 +26,7 @@ enum MessageType { connect(), startFileSystemServer(), updateRequest(), + rollbackRequest(), simpleSuccessResponse(), rowsResponse(), errorResponse(), @@ -94,6 +95,7 @@ sealed class Message { MessageType.openAdditionalConnection => OpenAdditonalConnection.deserialize(object), MessageType.updateRequest => UpdateStreamRequest.deserialize(object), + MessageType.rollbackRequest => RollbackStreamRequest.deserialize(object), MessageType.simpleSuccessResponse => SimpleSuccessResponse.deserialize(object), MessageType.endpointResponse => EndpointResponse.deserialize(object), @@ -660,6 +662,37 @@ final class UpdateStreamRequest extends Request { } } +final class RollbackStreamRequest extends Request { + /// When true, the client is requesting to be informed about rollbacks + /// happening on the database identified by this request. + /// + /// When false, the client is requesting to no longer be informed about these + /// updates. + final bool action; + + RollbackStreamRequest( + {required this.action, + required super.requestId, + required super.databaseId}); + + factory RollbackStreamRequest.deserialize(JSObject object) { + return RollbackStreamRequest( + action: (object[_UniqueFieldNames.action] as JSBoolean).toDart, + requestId: object.requestId, + databaseId: object.databaseId, + ); + } + + @override + MessageType get type => MessageType.rollbackRequest; + + @override + void serialize(JSObject object, List transferred) { + super.serialize(object, transferred); + object[_UniqueFieldNames.action] = action.toJS; + } +} + class CompatibilityCheck extends Request { @override final MessageType type; @@ -816,6 +849,27 @@ final class UpdateNotification extends Notification { } } +final class RollbackNotification extends Notification { + final int databaseId; + + RollbackNotification({required this.databaseId}); + + factory RollbackNotification.deserialize(JSObject object) { + return RollbackNotification( + databaseId: object.databaseId, + ); + } + + @override + MessageType get type => MessageType.notifyUpdate; + + @override + void serialize(JSObject object, List transferred) { + super.serialize(object, transferred); + object[_UniqueFieldNames.databaseId] = databaseId.toJS; + } +} + extension on JSObject { int get requestId { return (this[_UniqueFieldNames.id] as JSNumber).toDartInt; diff --git a/sqlite3_web/lib/src/worker.dart b/sqlite3_web/lib/src/worker.dart index 84411bc8..720c6cb1 100644 --- a/sqlite3_web/lib/src/worker.dart +++ b/sqlite3_web/lib/src/worker.dart @@ -111,6 +111,7 @@ final class _ConnectionDatabase { final int id; StreamSubscription? updates; + StreamSubscription? rollbacks; _ConnectionDatabase(this.database, [int? id]) : id = id ?? database.id; @@ -229,6 +230,23 @@ final class _ClientConnection extends ProtocolChannel } return SimpleSuccessResponse( response: null, requestId: request.requestId); + case RollbackStreamRequest(action: true): + if (database!.rollbacks == null) { + final rawDatabase = await database.database.opened; + database.rollbacks ??= rawDatabase.database.rollbacks.listen((_) { + sendNotification( + RollbackNotification(databaseId: database.database.id)); + }); + } + return SimpleSuccessResponse( + response: null, requestId: request.requestId); + case RollbackStreamRequest(action: false): + if (database!.rollbacks != null) { + database.rollbacks?.cancel(); + database.rollbacks = null; + } + return SimpleSuccessResponse( + response: null, requestId: request.requestId); case OpenAdditonalConnection(): final database = _databaseFor(request)!.database; database.refCount++; From 969517bf1d4d1f4875d968f26a4f7a7fbde94b4e Mon Sep 17 00:00:00 2001 From: Dominic Jack Date: Sat, 18 Jan 2025 18:53:24 +1000 Subject: [PATCH 02/11] added commits stream --- sqlite3/assets/wasm/bridge.h | 8 +- sqlite3/lib/src/database.dart | 19 ++++- sqlite3/lib/src/implementation/database.dart | 81 +++++++++++++++++++- sqlite3/lib/src/wasm/wasm_interop.dart | 2 +- sqlite3/test/common/database.dart | 34 ++++++++ sqlite3_web/lib/src/client.dart | 74 +++++++++++++++--- sqlite3_web/lib/src/database.dart | 6 ++ sqlite3_web/lib/src/protocol.dart | 58 ++++++++++++++ sqlite3_web/lib/src/worker.dart | 18 +++++ 9 files changed, 279 insertions(+), 21 deletions(-) diff --git a/sqlite3/assets/wasm/bridge.h b/sqlite3/assets/wasm/bridge.h index 23401b97..9e1115a5 100644 --- a/sqlite3/assets/wasm/bridge.h +++ b/sqlite3/assets/wasm/bridge.h @@ -50,10 +50,10 @@ import_dart("function_xInverse") extern void dartXInverse( import_dart("function_xFinal") extern void dartXFinal(sqlite3_context *ctx); import_dart("function_xValue") extern void dartXValue(sqlite3_context *ctx); import_dart("function_forget") extern void dartForgetAboutFunction(void *ptr); -import_dart("function_update_hook") extern void dartUpdateHook(void *id, int kind, - const char *db, - const char *table, - sqlite3_int64 rowid); +import_dart("function_hook") extern void dartUpdateHook(void *id, int kind, + const char *db, + const char *table, + sqlite3_int64 rowid); import_dart("function_commit_hook") extern int dartCommitHook(void *id); import_dart("function_rollback_hook") extern void dartRollbackHook(void *id); import_dart("function_compare") extern int dartXCompare(void *id, int lengthA, diff --git a/sqlite3/lib/src/database.dart b/sqlite3/lib/src/database.dart index 720ecbc3..3cf8f1bc 100644 --- a/sqlite3/lib/src/database.dart +++ b/sqlite3/lib/src/database.dart @@ -67,9 +67,26 @@ abstract class CommonDatabase { VoidPredicate? get commitFilter; set commitFilter(VoidPredicate? commitFilter); + /// An async stream that fires after each commit. + /// + /// Listening to this stream will register a "commit hook" on the native + /// database. Each commit that sqlite3 reports through that hook will then + /// be added to the stream. + /// + /// Note that the stream reports updates _asynchronously_, e.g. one event + /// loop iteration after sqlite reports them. + /// + /// Also note this works in conjunction with `commitFilter`. If the filter + /// function is not null and returns `false`, the commit will not occur and + /// this stream will not fire. + /// + /// See also: + /// - [Commit Hooks](https://www.sqlite.org/c3ref/commit_hook.html) + Stream get commits; + /// An async stream that fires after each rollback. /// - /// Listening to this stream will register an "update hook" on the native + /// Listening to this stream will register a "rollback hook" on the native /// database. Each rollback that sqlite3 reports through that hook will then /// be added to the stream. /// diff --git a/sqlite3/lib/src/implementation/database.dart b/sqlite3/lib/src/implementation/database.dart index 497793c7..4a460e32 100644 --- a/sqlite3/lib/src/implementation/database.dart +++ b/sqlite3/lib/src/implementation/database.dart @@ -59,6 +59,7 @@ base class DatabaseImplementation implements CommonDatabase { final List> _updateListeners = []; final List> _rollbackListeners = []; + final List> _commitListeners = []; VoidPredicate? _commitFilter; @@ -227,7 +228,11 @@ base class DatabaseImplementation implements CommonDatabase { disposeFinalizer.detach(this); _isClosed = true; - for (final listener in _updateListeners) { + for (final listener in [ + ..._updateListeners, + ..._rollbackListeners, + ..._commitListeners + ]) { listener.close(); } database.sqlite3_update_hook(null); @@ -508,14 +513,82 @@ base class DatabaseImplementation implements CommonDatabase { ); } + @override + Stream get commits { + return Stream.multi( + (newListener) { + if (_isClosed) { + newListener.closeSync(); + return; + } + + void addCommitListener() { + final isFirstListener = _commitListeners.isEmpty; + _commitListeners.add(newListener); + + if (isFirstListener && _commitFilter == null) { + // Add native commit hook + _updateCommitFilter(); + } + } + + void removeCommitListener() { + _commitListeners.remove(newListener); + + if (_commitListeners.isEmpty && _commitFilter == null && !_isClosed) { + _updateCommitFilter(); + } + } + + newListener + ..onPause = removeCommitListener + ..onCancel = removeCommitListener + ..onResume = addCommitListener; + + // Since this is a onListen callback, add listener now + addCommitListener(); + }, + isBroadcast: true, + ); + } + + void _updateCommitFilter() { + // update the commit filter to call both `commitFilter` and all `commit` + // listeners. This should be called after commitFilter changes, or when + // commitFilter is null and the number of listeners either goes to zero + // or changes from zero to one. + final commitFilter = _commitFilter; + if (commitFilter != null) { + database.sqlite3_commit_hook(() { + final complete = commitFilter(); + if (complete) { + for (final listener in _commitListeners) { + listener.add(null); + } + } + return complete ? 0 : 1; + }); + } else if (_commitListeners.isNotEmpty) { + database.sqlite3_commit_hook(() { + for (final listener in _commitListeners) { + listener.add(null); + } + return 0; + }); + } else { + database.sqlite3_commit_hook(null); + } + } + @override VoidPredicate? get commitFilter => _commitFilter; @override set commitFilter(VoidPredicate? commitFilter) { - _commitFilter = commitFilter; - database.sqlite3_commit_hook( - commitFilter == null ? null : () => commitFilter() ? 0 : 1); + if (_commitFilter != commitFilter) { + _commitFilter = commitFilter; + _updateCommitFilter(); + } } } diff --git a/sqlite3/lib/src/wasm/wasm_interop.dart b/sqlite3/lib/src/wasm/wasm_interop.dart index 40627001..dbbf03fe 100644 --- a/sqlite3/lib/src/wasm/wasm_interop.dart +++ b/sqlite3/lib/src/wasm/wasm_interop.dart @@ -731,7 +731,7 @@ class _InjectedValues { return callbacks.functions[ctx]!.collation!(aStr, bStr); }).toJS, - 'function_update_hook': + 'function_hook': ((int id, int kind, Pointer _, Pointer table, JSBigInt rowId) { final tableName = memory.readString(table); diff --git a/sqlite3/test/common/database.dart b/sqlite3/test/common/database.dart index 388ac16e..b9f18545 100644 --- a/sqlite3/test/common/database.dart +++ b/sqlite3/test/common/database.dart @@ -799,6 +799,40 @@ void testDatabase( }); }); + group('commit stream', () { + setUp(() { + database.commitFilter = null; + database.execute('CREATE TABLE tbl (a TEXT, b INT);'); + }); + + test('emits on implicit commit', () { + expect(database.commits, emits(isA())); + database.execute("INSERT INTO tbl VALUES ('', 1);"); + }); + + test('emits on explicit commit', () { + expect(database.commits, emits(isA())); + + database.execute('BEGIN TRANSACTION;'); + database.execute("INSERT INTO tbl VALUES ('', 1);"); + database.execute("COMMIT;"); + }); + + test('does not emit on implicit commit with commitFilter false', () async { + final future = expectLater( + database.commits + .timeout(Duration(seconds: 2), onTimeout: (sink) => sink.close()), + neverEmits(isA())); + database.commitFilter = () => false; + try { + database.execute("INSERT INTO tbl VALUES ('', 1);"); + } on SqliteException { + // ignore + } + await future; + }); + }); + group('unicode handling', () { test('accents in statements', () { final table = 'télé'; // with accent diff --git a/sqlite3_web/lib/src/client.dart b/sqlite3_web/lib/src/client.dart index f9a10b95..8f721d4c 100644 --- a/sqlite3_web/lib/src/client.dart +++ b/sqlite3_web/lib/src/client.dart @@ -19,14 +19,17 @@ final class RemoteDatabase implements Database { var _isClosed = false; - StreamSubscription? _notificationSubscription; + StreamSubscription? _updateNotificationSubscription; final StreamController _updates = StreamController.broadcast(); + StreamSubscription? _rollbackNotificationSubscription; final StreamController _rollbacks = StreamController.broadcast(); + StreamSubscription? _commitNotificationSubscription; + final StreamController _commits = StreamController.broadcast(); RemoteDatabase({required this.connection, required this.databaseId}) { _updates ..onListen = (() { - _notificationSubscription ??= + _updateNotificationSubscription ??= connection.notifications.stream.listen((notification) { if (notification case UpdateNotification()) { if (notification.databaseId == databaseId) { @@ -34,17 +37,49 @@ final class RemoteDatabase implements Database { } } }); - _requestUpdates(true); - _requestRollbacks(true); }) ..onCancel = (() { - _notificationSubscription?.cancel(); - _notificationSubscription = null; - + _updateNotificationSubscription?.cancel(); + _updateNotificationSubscription = null; _requestUpdates(false); + }); + + _rollbacks + ..onListen = (() { + _rollbackNotificationSubscription ??= + connection.notifications.stream.listen((notification) { + if (notification case RollbackNotification()) { + if (notification.databaseId == databaseId) { + _rollbacks.add(null); + } + } + }); + _requestRollbacks(true); + }) + ..onCancel = (() { + _rollbackNotificationSubscription?.cancel(); + _rollbackNotificationSubscription = null; _requestRollbacks(false); }); + + _commits + ..onListen = (() { + _commitNotificationSubscription ??= + connection.notifications.stream.listen((notification) { + if (notification case CommitNotification()) { + if (notification.databaseId == databaseId) { + _commits.add(null); + } + } + }); + _requestCommits(true); + }) + ..onCancel = (() { + _commitNotificationSubscription?.cancel(); + _commitNotificationSubscription = null; + _requestCommits(false); + }); } void _requestUpdates(bool sendUpdates) { @@ -67,6 +102,16 @@ final class RemoteDatabase implements Database { } } + void _requestCommits(bool sendCommits) { + if (!_isClosed) { + connection.sendRequest( + CommitStreamRequest( + action: sendCommits, requestId: 2, databaseId: databaseId), + MessageType.simpleSuccessResponse, + ); + } + } + @override Future get closed { return connection.closed; @@ -75,10 +120,14 @@ final class RemoteDatabase implements Database { @override Future dispose() async { _isClosed = true; - _updates.close(); - await connection.sendRequest( - CloseDatabase(requestId: 0, databaseId: databaseId), - MessageType.simpleSuccessResponse); + await ( + _updates.close(), + _rollbacks.close(), + _commits.close(), + connection.sendRequest( + CloseDatabase(requestId: 0, databaseId: databaseId), + MessageType.simpleSuccessResponse) + ).wait; } @override @@ -142,6 +191,9 @@ final class RemoteDatabase implements Database { @override Stream get rollbacks => _rollbacks.stream; + @override + Stream get commits => _commits.stream; + @override Future get userVersion async { final result = await select('pragma user_version;'); diff --git a/sqlite3_web/lib/src/database.dart b/sqlite3_web/lib/src/database.dart index 722cd097..d9256ac2 100644 --- a/sqlite3_web/lib/src/database.dart +++ b/sqlite3_web/lib/src/database.dart @@ -56,6 +56,12 @@ abstract class Database { /// stream is active. Stream get rollbacks; + /// A relayed stream of events triggered by commits from the remote worker. + /// + /// Updates are only sent across worker channels while a subscription to this + /// stream is active. + Stream get commits; + /// A future that resolves when the database is closed. /// /// Typically, databases are closed because [dispose] is called. For databases diff --git a/sqlite3_web/lib/src/protocol.dart b/sqlite3_web/lib/src/protocol.dart index 044a7324..e758ad84 100644 --- a/sqlite3_web/lib/src/protocol.dart +++ b/sqlite3_web/lib/src/protocol.dart @@ -27,6 +27,7 @@ enum MessageType { startFileSystemServer(), updateRequest(), rollbackRequest(), + commitRequest(), simpleSuccessResponse(), rowsResponse(), errorResponse(), @@ -34,6 +35,8 @@ enum MessageType { closeDatabase(), openAdditionalConnection(), notifyUpdate(), + notifyRollback(), + notifyCommit(), ; static final Map byName = values.asNameMap(); @@ -96,12 +99,15 @@ sealed class Message { OpenAdditonalConnection.deserialize(object), MessageType.updateRequest => UpdateStreamRequest.deserialize(object), MessageType.rollbackRequest => RollbackStreamRequest.deserialize(object), + MessageType.commitRequest => CommitStreamRequest.deserialize(object), MessageType.simpleSuccessResponse => SimpleSuccessResponse.deserialize(object), MessageType.endpointResponse => EndpointResponse.deserialize(object), MessageType.rowsResponse => RowsResponse.deserialize(object), MessageType.errorResponse => ErrorResponse.deserialize(object), MessageType.notifyUpdate => UpdateNotification.deserialize(object), + MessageType.notifyRollback => RollbackNotification.deserialize(object), + MessageType.notifyCommit => CommitNotification.deserialize(object), }; } @@ -693,6 +699,37 @@ final class RollbackStreamRequest extends Request { } } +final class CommitStreamRequest extends Request { + /// When true, the client is requesting to be informed about rollbacks + /// happening on the database identified by this request. + /// + /// When false, the client is requesting to no longer be informed about these + /// updates. + final bool action; + + CommitStreamRequest( + {required this.action, + required super.requestId, + required super.databaseId}); + + factory CommitStreamRequest.deserialize(JSObject object) { + return CommitStreamRequest( + action: (object[_UniqueFieldNames.action] as JSBoolean).toDart, + requestId: object.requestId, + databaseId: object.databaseId, + ); + } + + @override + MessageType get type => MessageType.rollbackRequest; + + @override + void serialize(JSObject object, List transferred) { + super.serialize(object, transferred); + object[_UniqueFieldNames.action] = action.toJS; + } +} + class CompatibilityCheck extends Request { @override final MessageType type; @@ -870,6 +907,27 @@ final class RollbackNotification extends Notification { } } +final class CommitNotification extends Notification { + final int databaseId; + + CommitNotification({required this.databaseId}); + + factory CommitNotification.deserialize(JSObject object) { + return CommitNotification( + databaseId: object.databaseId, + ); + } + + @override + MessageType get type => MessageType.notifyCommit; + + @override + void serialize(JSObject object, List transferred) { + super.serialize(object, transferred); + object[_UniqueFieldNames.databaseId] = databaseId.toJS; + } +} + extension on JSObject { int get requestId { return (this[_UniqueFieldNames.id] as JSNumber).toDartInt; diff --git a/sqlite3_web/lib/src/worker.dart b/sqlite3_web/lib/src/worker.dart index 720c6cb1..8196a811 100644 --- a/sqlite3_web/lib/src/worker.dart +++ b/sqlite3_web/lib/src/worker.dart @@ -112,6 +112,7 @@ final class _ConnectionDatabase { StreamSubscription? updates; StreamSubscription? rollbacks; + StreamSubscription? commits; _ConnectionDatabase(this.database, [int? id]) : id = id ?? database.id; @@ -247,6 +248,23 @@ final class _ClientConnection extends ProtocolChannel } return SimpleSuccessResponse( response: null, requestId: request.requestId); + case CommitStreamRequest(action: true): + if (database!.commits == null) { + final rawDatabase = await database.database.opened; + database.commits ??= rawDatabase.database.commits.listen((_) { + sendNotification( + RollbackNotification(databaseId: database.database.id)); + }); + } + return SimpleSuccessResponse( + response: null, requestId: request.requestId); + case CommitStreamRequest(action: false): + if (database!.commits != null) { + database.commits?.cancel(); + database.commits = null; + } + return SimpleSuccessResponse( + response: null, requestId: request.requestId); case OpenAdditonalConnection(): final database = _databaseFor(request)!.database; database.refCount++; From 207863853bc98653c7cff4896f2a6fd6ff52ea00 Mon Sep 17 00:00:00 2001 From: Dominic Jack Date: Sat, 18 Jan 2025 18:56:40 +1000 Subject: [PATCH 03/11] clang formatting --- sqlite3/assets/sqlite3.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sqlite3/assets/sqlite3.h b/sqlite3/assets/sqlite3.h index 82dfdd99..62366b2c 100644 --- a/sqlite3/assets/sqlite3.h +++ b/sqlite3/assets/sqlite3.h @@ -35,8 +35,8 @@ void *sqlite3_update_hook(sqlite3 *, void (*)(void *, int, sqlite3_char const *, sqlite3_char const *, int64_t), void *); -void *sqlite3_commit_hook(sqlite3*, int(*)(void *), void*); -void *sqlite3_rollback_hook(sqlite3*, void(*)(void *), void*); +void *sqlite3_commit_hook(sqlite3 *, int (*)(void *), void *); +void *sqlite3_rollback_hook(sqlite3 *, void (*)(void *), void *); int sqlite3_get_autocommit(sqlite3 *db); // Statements From c65f53d33418acd5d5c6821df1a73c12786605ef Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Sat, 18 Jan 2025 18:55:01 +0100 Subject: [PATCH 04/11] Use shared code for all stream implementations --- sqlite3/lib/src/implementation/database.dart | 344 +++++++++---------- 1 file changed, 164 insertions(+), 180 deletions(-) diff --git a/sqlite3/lib/src/implementation/database.dart b/sqlite3/lib/src/implementation/database.dart index 4a460e32..ef7149a2 100644 --- a/sqlite3/lib/src/implementation/database.dart +++ b/sqlite3/lib/src/implementation/database.dart @@ -57,11 +57,60 @@ base class DatabaseImplementation implements CommonDatabase { final FinalizableDatabase finalizable; - final List> _updateListeners = []; - final List> _rollbackListeners = []; - final List> _commitListeners = []; + late final _StreamHandlers _updates = + _StreamHandlers( + database: this, + register: () { + database.sqlite3_update_hook((kind, tableName, rowId) { + SqliteUpdateKind updateKind; + + switch (kind) { + case SQLITE_INSERT: + updateKind = SqliteUpdateKind.insert; + break; + case SQLITE_UPDATE: + updateKind = SqliteUpdateKind.update; + break; + case SQLITE_DELETE: + updateKind = SqliteUpdateKind.delete; + break; + default: + return; + } + + final update = SqliteUpdate(updateKind, tableName, rowId); + _updates.deliverAsyncEvent(update); + }); + }, + unregister: () => database.sqlite3_update_hook(null), + ); + late final _StreamHandlers _rollbacks = + _StreamHandlers( + database: this, + register: () => database.sqlite3_rollback_hook(() { + _rollbacks.deliverAsyncEvent(null); + }), + unregister: () => database.sqlite3_rollback_hook(null), + ); + late final _StreamHandlers _commits = _StreamHandlers( + database: this, + register: () => database.sqlite3_commit_hook(() { + var complete = true; + if (_commits.syncCallback case final callback?) { + complete = callback(); + } + + if (complete) { + _commits.deliverAsyncEvent(null); + // There's no reason to deliver a rollback event if the synchronous + // handler determined that the transaction should be reverted, sqlite3 + // will emit a rollbacke event for us. + } - VoidPredicate? _commitFilter; + return complete ? 0 : 1; + }), + unregister: () => database.sqlite3_commit_hook(null), + ); var _isClosed = false; @@ -228,13 +277,10 @@ base class DatabaseImplementation implements CommonDatabase { disposeFinalizer.detach(this); _isClosed = true; - for (final listener in [ - ..._updateListeners, - ..._rollbackListeners, - ..._commitListeners - ]) { - listener.close(); - } + _updates.close(); + _commits.close(); + _rollbacks.close(); + database.sqlite3_update_hook(null); database.sqlite3_commit_hook(null); database.sqlite3_rollback_hook(null); @@ -411,184 +457,20 @@ base class DatabaseImplementation implements CommonDatabase { } @override - Stream get updates { - return Stream.multi( - (newListener) { - if (_isClosed) { - newListener.closeSync(); - return; - } - - void addUpdateListener() { - final isFirstListener = _updateListeners.isEmpty; - _updateListeners.add(newListener); - - if (isFirstListener) { - // Add native update hook - database.sqlite3_update_hook((kind, tableName, rowId) { - SqliteUpdateKind updateKind; - - switch (kind) { - case SQLITE_INSERT: - updateKind = SqliteUpdateKind.insert; - break; - case SQLITE_UPDATE: - updateKind = SqliteUpdateKind.update; - break; - case SQLITE_DELETE: - updateKind = SqliteUpdateKind.delete; - break; - default: - return; - } - - final update = SqliteUpdate(updateKind, tableName, rowId); - for (final listener in _updateListeners) { - listener.add(update); - } - }); - } - } - - void removeUpdateListener() { - _updateListeners.remove(newListener); - - if (_updateListeners.isEmpty && !_isClosed) { - database.sqlite3_update_hook(null); // Remove native hook - } - } - - newListener - ..onPause = removeUpdateListener - ..onCancel = removeUpdateListener - ..onResume = addUpdateListener; - - // Since this is a onListen callback, add listener now - addUpdateListener(); - }, - isBroadcast: true, - ); - } + Stream get updates => _updates.stream; @override - Stream get rollbacks { - return Stream.multi( - (newListener) { - if (_isClosed) { - newListener.closeSync(); - return; - } - - void addRollbackListener() { - final isFirstListener = _rollbackListeners.isEmpty; - _rollbackListeners.add(newListener); - - if (isFirstListener) { - // Add native rollback hook - database.sqlite3_rollback_hook(() { - for (final listener in _rollbackListeners) { - listener.add(null); - } - }); - } - } - - void removeRollbackListener() { - _rollbackListeners.remove(newListener); - - if (_rollbackListeners.isEmpty && !_isClosed) { - database.sqlite3_rollback_hook(null); // Remove native hook - } - } - - newListener - ..onPause = removeRollbackListener - ..onCancel = removeRollbackListener - ..onResume = addRollbackListener; - - // Since this is a onListen callback, add listener now - addRollbackListener(); - }, - isBroadcast: true, - ); - } + Stream get rollbacks => _rollbacks.stream; @override - Stream get commits { - return Stream.multi( - (newListener) { - if (_isClosed) { - newListener.closeSync(); - return; - } - - void addCommitListener() { - final isFirstListener = _commitListeners.isEmpty; - _commitListeners.add(newListener); - - if (isFirstListener && _commitFilter == null) { - // Add native commit hook - _updateCommitFilter(); - } - } - - void removeCommitListener() { - _commitListeners.remove(newListener); - - if (_commitListeners.isEmpty && _commitFilter == null && !_isClosed) { - _updateCommitFilter(); - } - } - - newListener - ..onPause = removeCommitListener - ..onCancel = removeCommitListener - ..onResume = addCommitListener; - - // Since this is a onListen callback, add listener now - addCommitListener(); - }, - isBroadcast: true, - ); - } - - void _updateCommitFilter() { - // update the commit filter to call both `commitFilter` and all `commit` - // listeners. This should be called after commitFilter changes, or when - // commitFilter is null and the number of listeners either goes to zero - // or changes from zero to one. - final commitFilter = _commitFilter; - if (commitFilter != null) { - database.sqlite3_commit_hook(() { - final complete = commitFilter(); - if (complete) { - for (final listener in _commitListeners) { - listener.add(null); - } - } - return complete ? 0 : 1; - }); - } else if (_commitListeners.isNotEmpty) { - database.sqlite3_commit_hook(() { - for (final listener in _commitListeners) { - listener.add(null); - } - return 0; - }); - } else { - database.sqlite3_commit_hook(null); - } - } + Stream get commits => _commits.stream; @override - VoidPredicate? get commitFilter => _commitFilter; + VoidPredicate? get commitFilter => _commits.syncCallback; @override set commitFilter(VoidPredicate? commitFilter) { - if (_commitFilter != commitFilter) { - _commitFilter = commitFilter; - _updateCommitFilter(); - } + _commits.syncCallback = commitFilter; } } @@ -696,3 +578,105 @@ final class DatabaseConfigImplementation extends DatabaseConfig { } } } + +/// A shared implementation for the [CommonDatabase.updates], +/// [CommonDatabase.commits] and [CommonDatabase.rollbacks] streams used by +/// [DatabaseImplementation]. +/// +/// [T] is the event type of the stream. These streams wrap SQLite callbacks +/// which are not supposed to make their own database calls. Thus, all streams +/// have an asynchronous delay from when the C callback is called. +/// The commits stream also supports a synchronous callback that can turn +/// commits into rollbacks. This is represented by [_syncCallback]. +final class _StreamHandlers { + final DatabaseImplementation _database; + final List> _asyncListeners = []; + SyncCallback? _syncCallback; + + /// Registers a native callback on the database. + final void Function() _register; + + /// Unregisters the native callback on the database. + final void Function() _unregister; + + late final Stream stream = Stream.multi( + (newListener) { + if (_database._isClosed) { + newListener.close(); + return; + } + + void addListener() { + _addAsyncListener(newListener); + } + + void removeListener() { + _removeAsyncListener(newListener); + } + + newListener + ..onPause = removeListener + ..onCancel = removeListener + ..onResume = addListener; + // Since this is a onListen callback, add listener now + addListener(); + }, + isBroadcast: true, + ); + + _StreamHandlers({ + required DatabaseImplementation database, + required void Function() register, + required void Function() unregister, + }) : _database = database, + _register = register, + _unregister = unregister; + + bool get hasListener => _asyncListeners.isNotEmpty || _syncCallback != null; + + SyncCallback? get syncCallback => _syncCallback; + + set syncCallback(SyncCallback? value) { + if (value != _syncCallback) { + final hadListenerBefore = hasListener; + _syncCallback = value; + final hasListenerNow = hasListener; + + if (!hadListenerBefore && hasListenerNow) { + _register(); + } else if (hadListenerBefore && !hasListenerNow) { + _unregister(); + } + } + } + + void _addAsyncListener(MultiStreamController listener) { + final isFirstListener = !hasListener; + _asyncListeners.add(listener); + + if (isFirstListener) { + _register(); + } + } + + void _removeAsyncListener(MultiStreamController listener) { + _asyncListeners.remove(listener); + + if (!hasListener && !_database._isClosed) { + _unregister(); + } + } + + void deliverAsyncEvent(T event) { + for (final listener in _asyncListeners) { + listener.add(event); + } + } + + void close() { + for (final listener in _asyncListeners) { + listener.close(); + } + _syncCallback = null; + } +} From c0384787d2ae02d08c12088e259c97723960faf1 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Sat, 18 Jan 2025 18:55:31 +0100 Subject: [PATCH 05/11] Bump version --- .github/workflows/main.yml | 4 ++-- sqlite3/CHANGELOG.md | 5 +++++ sqlite3/assets/wasm/CMakeLists.txt | 2 +- sqlite3/pubspec.yaml | 2 +- 4 files changed, 9 insertions(+), 4 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index acf2d15b..d60f9d68 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -209,8 +209,8 @@ jobs: - name: Web tests run: | - curl https://simon-public.fsn1.your-objectstorage.com/assets/sqlite3/2.6.0/sqlite3.wasm -o example/web/sqlite3.wasm - curl https://simon-public.fsn1.your-objectstorage.com/assets/sqlite3/2.6.0/sqlite3mc.wasm -o example/web/sqlite3mc.wasm + curl https://simon-public.fsn1.your-objectstorage.com/assets/sqlite3/2.7.0/sqlite3.wasm -o example/web/sqlite3.wasm + curl https://simon-public.fsn1.your-objectstorage.com/assets/sqlite3/2.7.0/sqlite3mc.wasm -o example/web/sqlite3mc.wasm dart test -P web -r expanded # If browsers behave differently on different platforms, surely that's not our fault... # So, only run browser tests on Linux to be faster. diff --git a/sqlite3/CHANGELOG.md b/sqlite3/CHANGELOG.md index 98c96316..914dc790 100644 --- a/sqlite3/CHANGELOG.md +++ b/sqlite3/CHANGELOG.md @@ -1,3 +1,8 @@ +## 2.7.0-dev + +- Add support for commit and rollback hooks as well as a predicate that can + revert transactions. + ## 2.6.1 - Fix out-of-bound reads in the `xWrite` implementation of the OPFS-locks based diff --git a/sqlite3/assets/wasm/CMakeLists.txt b/sqlite3/assets/wasm/CMakeLists.txt index 3e84aede..e4283b3a 100644 --- a/sqlite3/assets/wasm/CMakeLists.txt +++ b/sqlite3/assets/wasm/CMakeLists.txt @@ -32,7 +32,7 @@ add_custom_command( OUTPUT required_symbols.txt WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../../ COMMAND dart run tool/wasm_symbols.dart ${CMAKE_CURRENT_BINARY_DIR}/required_symbols.txt - DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/../../tool/wasm_symbols.dart + DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/../../tool/wasm_symbols.dart ${CMAKE_CURRENT_SOURCE_DIR}/../../lib/src/wasm/wasm_interop.dart VERBATIM ) add_custom_target(required_symbols DEPENDS required_symbols.txt) diff --git a/sqlite3/pubspec.yaml b/sqlite3/pubspec.yaml index f3e1dea5..94129541 100644 --- a/sqlite3/pubspec.yaml +++ b/sqlite3/pubspec.yaml @@ -1,6 +1,6 @@ name: sqlite3 description: Provides lightweight yet convenient bindings to SQLite by using dart:ffi -version: 2.6.1 +version: 2.7.0-dev homepage: https://github.com/simolus3/sqlite3.dart/tree/main/sqlite3 issue_tracker: https://github.com/simolus3/sqlite3.dart/issues From 132165085b7557122dbde7fa16635a154847e0f9 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Sat, 18 Jan 2025 18:55:45 +0100 Subject: [PATCH 06/11] Make wasm functions for commit hooks optional --- sqlite3/lib/src/wasm/wasm_interop.dart | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/sqlite3/lib/src/wasm/wasm_interop.dart b/sqlite3/lib/src/wasm/wasm_interop.dart index 8f9ff54d..2c714680 100644 --- a/sqlite3/lib/src/wasm/wasm_interop.dart +++ b/sqlite3/lib/src/wasm/wasm_interop.dart @@ -33,8 +33,6 @@ class WasmBindings { _register_vfs, _unregister_vfs, _update_hooks, - _commit_hooks, - _rollback_hooks, _sqlite3_libversion, _sqlite3_sourceid, _sqlite3_libversion_number, @@ -85,7 +83,13 @@ class WasmBindings { _sqlite3_stmt_readonly, _sqlite3_stmt_isexplain; - final JSFunction? _sqlite3_db_config, _sqlite3_initialize; + // The released WASM bundle only exposes functions referenced in this file. + // So, when we release a new version of `package:sqlite3` using additional + // functions, we can't assume that existing bundles also have those functions. + final JSFunction? _sqlite3_db_config, + _sqlite3_initialize, + _commit_hooks, + _rollback_hooks; final Global _sqlite3_temp_directory; @@ -105,8 +109,6 @@ class WasmBindings { _register_vfs = instance.functions['dart_sqlite3_register_vfs']!, _unregister_vfs = instance.functions['sqlite3_vfs_unregister']!, _update_hooks = instance.functions['dart_sqlite3_updates']!, - _commit_hooks = instance.functions['dart_sqlite3_commits']!, - _rollback_hooks = instance.functions['dart_sqlite3_rollbacks']!, _sqlite3_libversion = instance.functions['sqlite3_libversion']!, _sqlite3_sourceid = instance.functions['sqlite3_sourceid']!, _sqlite3_libversion_number = @@ -165,6 +167,8 @@ class WasmBindings { _sqlite3_stmt_readonly = instance.functions['sqlite3_stmt_readonly']!, _sqlite3_db_config = instance.functions['dart_sqlite3_db_config_int'], _sqlite3_initialize = instance.functions['sqlite3_initialize'], + _commit_hooks = instance.functions['dart_sqlite3_commits'], + _rollback_hooks = instance.functions['dart_sqlite3_rollbacks'], _sqlite3_temp_directory = instance.globals['sqlite3_temp_directory']! // Note when adding new fields: We remove functions from the wasm module that @@ -292,12 +296,12 @@ class WasmBindings { _update_hooks.callReturningVoid2(db.toJS, id.toJS); } - int dart_sqlite3_commits(Pointer db, int id) { - return _commit_hooks.callReturningInt2(db.toJS, id.toJS); + void dart_sqlite3_commits(Pointer db, int id) { + return _commit_hooks!.callReturningVoid2(db.toJS, id.toJS); } void dart_sqlite3_rollbacks(Pointer db, int id) { - return _rollback_hooks.callReturningVoid2(db.toJS, id.toJS); + return _rollback_hooks!.callReturningVoid2(db.toJS, id.toJS); } int sqlite3_exec(Pointer db, Pointer sql, Pointer callback, From 449cabdc32047bb68f9c98e3c084f0c1873c93a5 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Sat, 18 Jan 2025 18:55:55 +0100 Subject: [PATCH 07/11] Test rollback event due to filter --- sqlite3/test/common/database.dart | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sqlite3/test/common/database.dart b/sqlite3/test/common/database.dart index b9f18545..762d9649 100644 --- a/sqlite3/test/common/database.dart +++ b/sqlite3/test/common/database.dart @@ -738,6 +738,15 @@ void testDatabase( } database.execute("ROLLBACK;"); }); + + test('emits on rollback due to commit filter', () { + expect(database.rollbacks, emits(isA())); + database.commitFilter = expectAsync0(() => false); + + database.execute('begin'); + database.execute("INSERT INTO tbl VALUES ('', 1);"); + expect(() => database.execute('commit'), throwsSqlError(19, 531)); + }); }); group('commit filter', () { From 8e18f84efc4749c76465164a0f3ef0d8c460a625 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Sat, 18 Jan 2025 18:59:17 +0100 Subject: [PATCH 08/11] Update expected version in wasm test --- sqlite3/test/wasm/sqlite3_test.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sqlite3/test/wasm/sqlite3_test.dart b/sqlite3/test/wasm/sqlite3_test.dart index 6f091db3..b4aa448c 100644 --- a/sqlite3/test/wasm/sqlite3_test.dart +++ b/sqlite3/test/wasm/sqlite3_test.dart @@ -45,7 +45,7 @@ void main() { expect( version, isA() - .having((e) => e.libVersion, 'libVersion', startsWith('3.47')), + .having((e) => e.libVersion, 'libVersion', startsWith('3.48')), ); }); From 9a43a061b12ecc099c9e9fd25feb43c2e54fce48 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Sat, 18 Jan 2025 20:13:42 +0100 Subject: [PATCH 09/11] Avoid dart2js issue with late final fields --- sqlite3/lib/src/implementation/database.dart | 134 ++++++++++--------- 1 file changed, 72 insertions(+), 62 deletions(-) diff --git a/sqlite3/lib/src/implementation/database.dart b/sqlite3/lib/src/implementation/database.dart index ef7149a2..039bb0e2 100644 --- a/sqlite3/lib/src/implementation/database.dart +++ b/sqlite3/lib/src/implementation/database.dart @@ -57,60 +57,9 @@ base class DatabaseImplementation implements CommonDatabase { final FinalizableDatabase finalizable; - late final _StreamHandlers _updates = - _StreamHandlers( - database: this, - register: () { - database.sqlite3_update_hook((kind, tableName, rowId) { - SqliteUpdateKind updateKind; - - switch (kind) { - case SQLITE_INSERT: - updateKind = SqliteUpdateKind.insert; - break; - case SQLITE_UPDATE: - updateKind = SqliteUpdateKind.update; - break; - case SQLITE_DELETE: - updateKind = SqliteUpdateKind.delete; - break; - default: - return; - } - - final update = SqliteUpdate(updateKind, tableName, rowId); - _updates.deliverAsyncEvent(update); - }); - }, - unregister: () => database.sqlite3_update_hook(null), - ); - late final _StreamHandlers _rollbacks = - _StreamHandlers( - database: this, - register: () => database.sqlite3_rollback_hook(() { - _rollbacks.deliverAsyncEvent(null); - }), - unregister: () => database.sqlite3_rollback_hook(null), - ); - late final _StreamHandlers _commits = _StreamHandlers( - database: this, - register: () => database.sqlite3_commit_hook(() { - var complete = true; - if (_commits.syncCallback case final callback?) { - complete = callback(); - } - - if (complete) { - _commits.deliverAsyncEvent(null); - // There's no reason to deliver a rollback event if the synchronous - // handler determined that the transaction should be reverted, sqlite3 - // will emit a rollbacke event for us. - } - - return complete ? 0 : 1; - }), - unregister: () => database.sqlite3_commit_hook(null), - ); + _StreamHandlers? _updates; + _StreamHandlers? _rollbacks; + _StreamHandlers? _commits; var _isClosed = false; @@ -157,6 +106,67 @@ base class DatabaseImplementation implements CommonDatabase { } } + _StreamHandlers _updatesHandler() { + return _updates ??= _StreamHandlers( + database: this, + register: () { + database.sqlite3_update_hook((kind, tableName, rowId) { + SqliteUpdateKind updateKind; + + switch (kind) { + case SQLITE_INSERT: + updateKind = SqliteUpdateKind.insert; + break; + case SQLITE_UPDATE: + updateKind = SqliteUpdateKind.update; + break; + case SQLITE_DELETE: + updateKind = SqliteUpdateKind.delete; + break; + default: + return; + } + + final update = SqliteUpdate(updateKind, tableName, rowId); + _updates!.deliverAsyncEvent(update); + }); + }, + unregister: () => database.sqlite3_update_hook(null), + ); + } + + _StreamHandlers _rollbackHandler() { + return _rollbacks ??= _StreamHandlers( + database: this, + register: () => database.sqlite3_rollback_hook(() { + _rollbacks!.deliverAsyncEvent(null); + }), + unregister: () => database.sqlite3_rollback_hook(null), + ); + } + + _StreamHandlers _commitHandler() { + return _commits ??= _StreamHandlers( + database: this, + register: () => database.sqlite3_commit_hook(() { + var complete = true; + if (_commits!.syncCallback case final callback?) { + complete = callback(); + } + + if (complete) { + _commits!.deliverAsyncEvent(null); + // There's no reason to deliver a rollback event if the synchronous + // handler determined that the transaction should be reverted, sqlite3 + // will emit a rollbacke event for us. + } + + return complete ? 0 : 1; + }), + unregister: () => database.sqlite3_commit_hook(null), + ); + } + Uint8List _validateAndEncodeFunctionName(String functionName) { final functionNameBytes = utf8.encode(functionName); @@ -277,9 +287,9 @@ base class DatabaseImplementation implements CommonDatabase { disposeFinalizer.detach(this); _isClosed = true; - _updates.close(); - _commits.close(); - _rollbacks.close(); + _updates?.close(); + _commits?.close(); + _rollbacks?.close(); database.sqlite3_update_hook(null); database.sqlite3_commit_hook(null); @@ -457,20 +467,20 @@ base class DatabaseImplementation implements CommonDatabase { } @override - Stream get updates => _updates.stream; + Stream get updates => _updatesHandler().stream; @override - Stream get rollbacks => _rollbacks.stream; + Stream get rollbacks => _rollbackHandler().stream; @override - Stream get commits => _commits.stream; + Stream get commits => _commitHandler().stream; @override - VoidPredicate? get commitFilter => _commits.syncCallback; + VoidPredicate? get commitFilter => _commitHandler().syncCallback; @override set commitFilter(VoidPredicate? commitFilter) { - _commits.syncCallback = commitFilter; + _commitHandler().syncCallback = commitFilter; } } From 15820b86c35c899e518c56d0f8e6544a648fc9a1 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Sat, 18 Jan 2025 20:19:24 +0100 Subject: [PATCH 10/11] Avoid timeout in test --- sqlite3/test/common/database.dart | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/sqlite3/test/common/database.dart b/sqlite3/test/common/database.dart index 762d9649..c0573ff6 100644 --- a/sqlite3/test/common/database.dart +++ b/sqlite3/test/common/database.dart @@ -713,14 +713,14 @@ void testDatabase( }); test('emits on rollback', () { - expect(database.rollbacks, emits(isA())); + expect(database.rollbacks, emits(anything)); database.execute('BEGIN TRANSACTION;'); database.execute("ROLLBACK;"); }); test('emits on rollback after insert', () { - expect(database.rollbacks, emits(isA())); + expect(database.rollbacks, emits(anything)); database.execute('BEGIN TRANSACTION;'); database.execute("INSERT INTO tbl VALUES ('', 1);"); @@ -728,7 +728,7 @@ void testDatabase( }); test('emits on rollback after erroneous SQL', () { - expect(database.rollbacks, emits(isA())); + expect(database.rollbacks, emits(anything)); database.execute('BEGIN TRANSACTION;'); try { @@ -740,7 +740,7 @@ void testDatabase( }); test('emits on rollback due to commit filter', () { - expect(database.rollbacks, emits(isA())); + expect(database.rollbacks, emits(anything)); database.commitFilter = expectAsync0(() => false); database.execute('begin'); @@ -815,12 +815,12 @@ void testDatabase( }); test('emits on implicit commit', () { - expect(database.commits, emits(isA())); + expect(database.commits, emits(anything)); database.execute("INSERT INTO tbl VALUES ('', 1);"); }); test('emits on explicit commit', () { - expect(database.commits, emits(isA())); + expect(database.commits, emits(anything)); database.execute('BEGIN TRANSACTION;'); database.execute("INSERT INTO tbl VALUES ('', 1);"); @@ -828,17 +828,17 @@ void testDatabase( }); test('does not emit on implicit commit with commitFilter false', () async { - final future = expectLater( - database.commits - .timeout(Duration(seconds: 2), onTimeout: (sink) => sink.close()), - neverEmits(isA())); + expect(database.commits, neverEmits(anything)); database.commitFilter = () => false; try { database.execute("INSERT INTO tbl VALUES ('', 1);"); } on SqliteException { // ignore } - await future; + + // Disposing the database here so that the stream closes and neverEmits + // completes. + database.dispose(); }); }); From 940880a054dc342e2bc114570cea7094503d88a3 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Sat, 18 Jan 2025 21:26:16 +0100 Subject: [PATCH 11/11] Add tests to sqlite3_web --- sqlite3_web/lib/src/client.dart | 100 +++++++----------- sqlite3_web/lib/src/protocol.dart | 141 ++++++------------------- sqlite3_web/lib/src/worker.dart | 112 +++++++++++--------- sqlite3_web/pubspec.yaml | 4 +- sqlite3_web/test/integration_test.dart | 14 ++- sqlite3_web/tool/server.dart | 8 +- sqlite3_web/web/main.dart | 14 ++- 7 files changed, 169 insertions(+), 224 deletions(-) diff --git a/sqlite3_web/lib/src/client.dart b/sqlite3_web/lib/src/client.dart index 172ad716..a4f89c76 100644 --- a/sqlite3_web/lib/src/client.dart +++ b/sqlite3_web/lib/src/client.dart @@ -14,6 +14,11 @@ import 'protocol.dart'; import 'shared.dart'; import 'worker.dart'; +final class _CommitOrRollbackStream { + StreamSubscription? workerSubscription; + final StreamController controller = StreamController.broadcast(); +} + final class RemoteDatabase implements Database { final WorkerConnection connection; final int databaseId; @@ -22,10 +27,9 @@ final class RemoteDatabase implements Database { StreamSubscription? _updateNotificationSubscription; final StreamController _updates = StreamController.broadcast(); - StreamSubscription? _rollbackNotificationSubscription; - final StreamController _rollbacks = StreamController.broadcast(); - StreamSubscription? _commitNotificationSubscription; - final StreamController _commits = StreamController.broadcast(); + + final _CommitOrRollbackStream _commits = _CommitOrRollbackStream(); + final _CommitOrRollbackStream _rollbacks = _CommitOrRollbackStream(); RemoteDatabase({required this.connection, required this.databaseId}) { _updates @@ -38,76 +42,54 @@ final class RemoteDatabase implements Database { } } }); - _requestUpdates(true); + _requestStreamUpdates(MessageType.updateRequest, true); }) ..onCancel = (() { _updateNotificationSubscription?.cancel(); _updateNotificationSubscription = null; - _requestUpdates(false); + _requestStreamUpdates(MessageType.updateRequest, false); }); - _rollbacks - ..onListen = (() { - _rollbackNotificationSubscription ??= - connection.notifications.stream.listen((notification) { - if (notification case RollbackNotification()) { - if (notification.databaseId == databaseId) { - _rollbacks.add(null); - } - } - }); - _requestRollbacks(true); - }) - ..onCancel = (() { - _rollbackNotificationSubscription?.cancel(); - _rollbackNotificationSubscription = null; - _requestRollbacks(false); - }); + _setupCommitOrRollbackStream( + _commits, MessageType.commitRequest, MessageType.notifyCommit); + _setupCommitOrRollbackStream( + _rollbacks, MessageType.rollbackRequest, MessageType.notifyRollback); + } - _commits + void _setupCommitOrRollbackStream( + _CommitOrRollbackStream stream, + MessageType requestSubscription, + MessageType notificationType, + ) { + stream.controller ..onListen = (() { - _commitNotificationSubscription ??= + stream.workerSubscription ??= connection.notifications.stream.listen((notification) { - if (notification case CommitNotification()) { - if (notification.databaseId == databaseId) { - _commits.add(null); + if (notification case EmptyNotification(type: final type)) { + if (notification.databaseId == databaseId && + type == notificationType) { + stream.controller.add(null); } } }); - _requestCommits(true); + _requestStreamUpdates(requestSubscription, true); }) ..onCancel = (() { - _commitNotificationSubscription?.cancel(); - _commitNotificationSubscription = null; - _requestCommits(false); + stream.workerSubscription?.cancel(); + stream.workerSubscription = null; + _requestStreamUpdates(requestSubscription, false); }); } - void _requestUpdates(bool sendUpdates) { + void _requestStreamUpdates(MessageType streamType, bool subscribe) { if (!_isClosed) { connection.sendRequest( - UpdateStreamRequest( - action: sendUpdates, requestId: 0, databaseId: databaseId), - MessageType.simpleSuccessResponse, - ); - } - } - - void _requestRollbacks(bool sendRollbacks) { - if (!_isClosed) { - connection.sendRequest( - RollbackStreamRequest( - action: sendRollbacks, requestId: 1, databaseId: databaseId), - MessageType.simpleSuccessResponse, - ); - } - } - - void _requestCommits(bool sendCommits) { - if (!_isClosed) { - connection.sendRequest( - CommitStreamRequest( - action: sendCommits, requestId: 2, databaseId: databaseId), + StreamRequest( + type: streamType, + action: subscribe, + requestId: 0, // filled out in sendRequest + databaseId: databaseId, + ), MessageType.simpleSuccessResponse, ); } @@ -123,8 +105,8 @@ final class RemoteDatabase implements Database { _isClosed = true; await ( _updates.close(), - _rollbacks.close(), - _commits.close(), + _rollbacks.controller.close(), + _commits.controller.close(), connection.sendRequest( CloseDatabase(requestId: 0, databaseId: databaseId), MessageType.simpleSuccessResponse) @@ -190,10 +172,10 @@ final class RemoteDatabase implements Database { Stream get updates => _updates.stream; @override - Stream get rollbacks => _rollbacks.stream; + Stream get rollbacks => _rollbacks.controller.stream; @override - Stream get commits => _commits.stream; + Stream get commits => _commits.controller.stream; @override Future get userVersion async { diff --git a/sqlite3_web/lib/src/protocol.dart b/sqlite3_web/lib/src/protocol.dart index e758ad84..8e249476 100644 --- a/sqlite3_web/lib/src/protocol.dart +++ b/sqlite3_web/lib/src/protocol.dart @@ -25,9 +25,9 @@ enum MessageType { fileSystemFlush(), connect(), startFileSystemServer(), - updateRequest(), - rollbackRequest(), - commitRequest(), + updateRequest(), + rollbackRequest(), + commitRequest(), simpleSuccessResponse(), rowsResponse(), errorResponse(), @@ -35,8 +35,8 @@ enum MessageType { closeDatabase(), openAdditionalConnection(), notifyUpdate(), - notifyRollback(), - notifyCommit(), + notifyRollback(), + notifyCommit(), ; static final Map byName = values.asNameMap(); @@ -97,17 +97,19 @@ sealed class Message { MessageType.closeDatabase => CloseDatabase.deserialize(object), MessageType.openAdditionalConnection => OpenAdditonalConnection.deserialize(object), - MessageType.updateRequest => UpdateStreamRequest.deserialize(object), - MessageType.rollbackRequest => RollbackStreamRequest.deserialize(object), - MessageType.commitRequest => CommitStreamRequest.deserialize(object), + MessageType.updateRequest || + MessageType.rollbackRequest || + MessageType.commitRequest => + StreamRequest.deserialize(type, object), MessageType.simpleSuccessResponse => SimpleSuccessResponse.deserialize(object), MessageType.endpointResponse => EndpointResponse.deserialize(object), MessageType.rowsResponse => RowsResponse.deserialize(object), MessageType.errorResponse => ErrorResponse.deserialize(object), MessageType.notifyUpdate => UpdateNotification.deserialize(object), - MessageType.notifyRollback => RollbackNotification.deserialize(object), - MessageType.notifyCommit => CommitNotification.deserialize(object), + MessageType.notifyRollback || + MessageType.notifyCommit => + EmptyNotification.deserialize(type, object), }; } @@ -637,7 +639,7 @@ final class ErrorResponse extends Response { } } -final class UpdateStreamRequest extends Request { +final class StreamRequest extends Request { /// When true, the client is requesting to be informed about updates happening /// on the database identified by this request. /// @@ -645,84 +647,25 @@ final class UpdateStreamRequest extends Request { /// updates. final bool action; - UpdateStreamRequest( - {required this.action, - required super.requestId, - required super.databaseId}); + final MessageType type; - factory UpdateStreamRequest.deserialize(JSObject object) { - return UpdateStreamRequest( - action: (object[_UniqueFieldNames.action] as JSBoolean).toDart, - requestId: object.requestId, - databaseId: object.databaseId, - ); - } - - @override - MessageType get type => MessageType.updateRequest; - - @override - void serialize(JSObject object, List transferred) { - super.serialize(object, transferred); - object[_UniqueFieldNames.action] = action.toJS; - } -} - -final class RollbackStreamRequest extends Request { - /// When true, the client is requesting to be informed about rollbacks - /// happening on the database identified by this request. - /// - /// When false, the client is requesting to no longer be informed about these - /// updates. - final bool action; - - RollbackStreamRequest( - {required this.action, - required super.requestId, - required super.databaseId}); - - factory RollbackStreamRequest.deserialize(JSObject object) { - return RollbackStreamRequest( - action: (object[_UniqueFieldNames.action] as JSBoolean).toDart, - requestId: object.requestId, - databaseId: object.databaseId, - ); - } - - @override - MessageType get type => MessageType.rollbackRequest; - - @override - void serialize(JSObject object, List transferred) { - super.serialize(object, transferred); - object[_UniqueFieldNames.action] = action.toJS; - } -} - -final class CommitStreamRequest extends Request { - /// When true, the client is requesting to be informed about rollbacks - /// happening on the database identified by this request. - /// - /// When false, the client is requesting to no longer be informed about these - /// updates. - final bool action; - - CommitStreamRequest( - {required this.action, - required super.requestId, - required super.databaseId}); + StreamRequest({ + required this.type, + required this.action, + required super.requestId, + required super.databaseId, + }); - factory CommitStreamRequest.deserialize(JSObject object) { - return CommitStreamRequest( + factory StreamRequest.deserialize( + MessageType type, JSObject object) { + return StreamRequest( + type: type, action: (object[_UniqueFieldNames.action] as JSBoolean).toDart, requestId: object.requestId, databaseId: object.databaseId, ); } - @override - MessageType get type => MessageType.rollbackRequest; - @override void serialize(JSObject object, List transferred) { super.serialize(object, transferred); @@ -886,41 +829,23 @@ final class UpdateNotification extends Notification { } } -final class RollbackNotification extends Notification { +/// Used as a notification without a payload, e.g. for commit or rollback +/// events. +final class EmptyNotification extends Notification { final int databaseId; - - RollbackNotification({required this.databaseId}); - - factory RollbackNotification.deserialize(JSObject object) { - return RollbackNotification( - databaseId: object.databaseId, - ); - } - - @override - MessageType get type => MessageType.notifyUpdate; - @override - void serialize(JSObject object, List transferred) { - super.serialize(object, transferred); - object[_UniqueFieldNames.databaseId] = databaseId.toJS; - } -} - -final class CommitNotification extends Notification { - final int databaseId; + final MessageType type; - CommitNotification({required this.databaseId}); + EmptyNotification({required this.type, required this.databaseId}); - factory CommitNotification.deserialize(JSObject object) { - return CommitNotification( + factory EmptyNotification.deserialize( + MessageType type, JSObject object) { + return EmptyNotification( + type: type, databaseId: object.databaseId, ); } - @override - MessageType get type => MessageType.notifyCommit; - @override void serialize(JSObject object, List transferred) { super.serialize(object, transferred); diff --git a/sqlite3_web/lib/src/worker.dart b/sqlite3_web/lib/src/worker.dart index 7d0a6b91..13f3f147 100644 --- a/sqlite3_web/lib/src/worker.dart +++ b/sqlite3_web/lib/src/worker.dart @@ -126,20 +126,30 @@ final class Local extends WorkerEnvironment { } } +class _StreamState { + StreamSubscription? subscription; + + void cancel() { + subscription?.cancel(); + subscription = null; + } +} + /// A database opened by a client. final class _ConnectionDatabase { final DatabaseState database; final int id; - StreamSubscription? updates; - StreamSubscription? rollbacks; - StreamSubscription? commits; + final _StreamState updates = _StreamState(); + final _StreamState rollbacks = _StreamState(); + final _StreamState commits = _StreamState(); _ConnectionDatabase(this.database, [int? id]) : id = id ?? database.id; Future close() async { - updates?.cancel(); - updates = null; + updates.cancel(); + rollbacks.cancel(); + commits.cancel(); await database.decrementRefCount(); } @@ -235,57 +245,34 @@ final class _ClientConnection extends ProtocolChannel return SimpleSuccessResponse( response: null, requestId: request.requestId); } - case UpdateStreamRequest(action: true): - if (database!.updates == null) { + case StreamRequest(action: true, type: MessageType.updateRequest): + return await subscribe(database!.updates, () async { final rawDatabase = await database.database.opened; - database.updates ??= rawDatabase.database.updates.listen((event) { + return rawDatabase.database.updates.listen((event) { sendNotification(UpdateNotification( update: event, databaseId: database.database.id)); }); - } - return SimpleSuccessResponse( - response: null, requestId: request.requestId); - case UpdateStreamRequest(action: false): - if (database!.updates != null) { - database.updates?.cancel(); - database.updates = null; - } - return SimpleSuccessResponse( - response: null, requestId: request.requestId); - case RollbackStreamRequest(action: true): - if (database!.rollbacks == null) { + }, request); + case StreamRequest(action: true, type: MessageType.commitRequest): + return await subscribe(database!.commits, () async { final rawDatabase = await database.database.opened; - database.rollbacks ??= rawDatabase.database.rollbacks.listen((_) { - sendNotification( - RollbackNotification(databaseId: database.database.id)); + return rawDatabase.database.commits.listen((event) { + sendNotification(EmptyNotification( + type: MessageType.notifyCommit, + databaseId: database.database.id)); }); - } - return SimpleSuccessResponse( - response: null, requestId: request.requestId); - case RollbackStreamRequest(action: false): - if (database!.rollbacks != null) { - database.rollbacks?.cancel(); - database.rollbacks = null; - } - return SimpleSuccessResponse( - response: null, requestId: request.requestId); - case CommitStreamRequest(action: true): - if (database!.commits == null) { + }, request); + case StreamRequest(action: true, type: MessageType.rollbackRequest): + return await subscribe(database!.rollbacks, () async { final rawDatabase = await database.database.opened; - database.commits ??= rawDatabase.database.commits.listen((_) { - sendNotification( - RollbackNotification(databaseId: database.database.id)); + return rawDatabase.database.rollbacks.listen((event) { + sendNotification(EmptyNotification( + type: MessageType.notifyRollback, + databaseId: database.database.id)); }); - } - return SimpleSuccessResponse( - response: null, requestId: request.requestId); - case CommitStreamRequest(action: false): - if (database!.commits != null) { - database.commits?.cancel(); - database.commits = null; - } - return SimpleSuccessResponse( - response: null, requestId: request.requestId); + }, request); + case StreamRequest(action: false): + return unsubscribe(database!, request); case OpenAdditonalConnection(): final database = _databaseFor(request)!.database; database.refCount++; @@ -339,9 +326,38 @@ final class _ClientConnection extends ProtocolChannel } finally { file.xClose(); } + case StreamRequest(action: true): + // Suppported stream requests handled in cases above. + return ErrorResponse( + message: 'Invalid stream subscription request', + requestId: request.requestId); } } + Future subscribe( + _StreamState state, + Future> Function() subscribeInternally, + StreamRequest request, + ) async { + state.subscription ??= await subscribeInternally(); + return SimpleSuccessResponse(response: null, requestId: request.requestId); + } + + Response unsubscribe(_ConnectionDatabase database, StreamRequest request) { + assert(!request.action); + final handler = switch (request.type) { + MessageType.updateRequest => database.updates, + MessageType.rollbackRequest => database.rollbacks, + MessageType.commitRequest => database.commits, + _ => throw AssertionError(), + }; + handler.cancel(); + + return SimpleSuccessResponse(response: null, requestId: request.requestId); + } + + void handleStreamCancelRequest() {} + @override void handleNotification(Notification notification) { // There aren't supposed to be any notifications from the client. diff --git a/sqlite3_web/pubspec.yaml b/sqlite3_web/pubspec.yaml index ef02eff0..eeb6c688 100644 --- a/sqlite3_web/pubspec.yaml +++ b/sqlite3_web/pubspec.yaml @@ -1,6 +1,6 @@ name: sqlite3_web description: Utilities to simplify accessing sqlite3 on the web, with automated feature detection. -version: 0.2.1 +version: 0.2.2-dev homepage: https://github.com/simolus3/sqlite3.dart/tree/main/sqlite3_web repository: https://github.com/simolus3/sqlite3.dart @@ -8,7 +8,7 @@ environment: sdk: ^3.3.0 dependencies: - sqlite3: ^2.6.0 + sqlite3: ^2.7.0-dev stream_channel: ^2.1.2 web: ^1.0.0 diff --git a/sqlite3_web/test/integration_test.dart b/sqlite3_web/test/integration_test.dart index b9c42cad..9474a329 100644 --- a/sqlite3_web/test/integration_test.dart +++ b/sqlite3_web/test/integration_test.dart @@ -159,13 +159,23 @@ void main() { await driver.assertFile(false); await driver.execute('CREATE TABLE foo (bar TEXT);'); - expect(await driver.countUpdateEvents(), 0); + var events = await driver.countEvents(); + expect(events.updates, 0); + expect(events.commits, 0); + expect(events.rollbacks, 0); await driver.execute("INSERT INTO foo (bar) VALUES ('hello');"); - expect(await driver.countUpdateEvents(), 1); + events = await driver.countEvents(); + expect(events.updates, 1); + expect(events.commits, 1); expect(await driver.assertFile(true), isPositive); await driver.flush(); + await driver.execute('begin'); + await driver.execute('rollback'); + events = await driver.countEvents(); + expect(events.rollbacks, 1); + if (storage != StorageMode.inMemory) { await driver.driver.refresh(); await driver.waitReady(); diff --git a/sqlite3_web/tool/server.dart b/sqlite3_web/tool/server.dart index 026a38c1..24e3fcf4 100644 --- a/sqlite3_web/tool/server.dart +++ b/sqlite3_web/tool/server.dart @@ -169,10 +169,14 @@ class TestWebDriver { await driver.executeAsync("close('', arguments[0])", []); } - Future countUpdateEvents() async { + Future<({int updates, int commits, int rollbacks})> countEvents() async { final result = await driver.executeAsync('get_updates("", arguments[0])', []); - return result as int; + return ( + updates: result[0] as int, + commits: result[1] as int, + rollbacks: result[2] as int, + ); } Future execute(String sql) async { diff --git a/sqlite3_web/web/main.dart b/sqlite3_web/web/main.dart index 6c8a4b6b..a6996747 100644 --- a/sqlite3_web/web/main.dart +++ b/sqlite3_web/web/main.dart @@ -1,10 +1,10 @@ import 'dart:convert'; -import 'dart:html'; import 'dart:js_interop'; import 'dart:js_interop_unsafe'; import 'dart:typed_data'; import 'package:sqlite3_web/sqlite3_web.dart'; +import 'package:web/web.dart'; import 'controller.dart'; @@ -16,6 +16,8 @@ WebSqlite? webSqlite; Database? database; int updates = 0; +int commits = 0; +int rollbacks = 0; bool listeningForUpdates = false; void main() { @@ -26,7 +28,11 @@ void main() { }); _addCallbackForWebDriver('get_updates', (arg) async { listenForUpdates(); - return updates.toJS; + return [ + updates.toJS, + commits.toJS, + rollbacks.toJS, + ].toJS; }); _addCallbackForWebDriver('open', (arg) => _open(arg, false)); _addCallbackForWebDriver('open_only_vfs', (arg) => _open(arg, true)); @@ -100,7 +106,7 @@ void main() { print('missing features: ${database.features.missingFeatures}'); }); - document.body!.children.add(DivElement()..id = 'ready'); + document.body!.appendChild(HTMLDivElement()..id = 'ready'); } void _addCallbackForWebDriver( @@ -179,6 +185,8 @@ void listenForUpdates() { if (!listeningForUpdates) { listeningForUpdates = true; database!.updates.listen((_) => updates++); + database!.commits.listen((_) => commits++); + database!.rollbacks.listen((_) => rollbacks++); } }