From cce864301fec4c761ef9b915dba285dc9f3bef2b Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Wed, 14 Feb 2024 17:24:35 +0200 Subject: [PATCH 01/10] add platforms to pubspec --- pubspec.yaml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pubspec.yaml b/pubspec.yaml index bbce796..889e269 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -30,3 +30,11 @@ dev_dependencies: shelf_static: ^1.1.2 stream_channel: ^2.1.2 path: ^1.9.0 + +platforms: + android: + ios: + linux: + macos: + windows: + web: From 0de35bef2a4e9bcf5d3d76bb3f217dcda5b57083 Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Tue, 20 Feb 2024 15:41:20 +0200 Subject: [PATCH 02/10] use open factories to return SqliteConnections --- lib/src/common/abstract_open_factory.dart | 37 ++- lib/src/impl/stub_sqlite_open_factory.dart | 8 + lib/src/native/database/connection_pool.dart | 22 +- .../database/native_sqlite_database.dart | 24 +- .../native/native_sqlite_open_factory.dart | 15 + lib/src/sqlite_connection.dart | 4 + .../connection/drift_sqlite_connection.dart | 259 ++++++++++++++++++ .../database/executor/drift_sql_executor.dart | 76 ----- .../database/executor/sqlite_executor.dart | 19 -- lib/src/web/database/web_db_context.dart | 96 ------- .../database/web_sqlite_connection_impl.dart | 144 ---------- lib/src/web/database/web_sqlite_database.dart | 14 +- .../web/web_isolate_connection_factory.dart | 10 +- lib/src/web/web_sqlite_open_factory.dart | 23 +- lib/src/web/worker/worker_utils.dart | 2 +- 15 files changed, 359 insertions(+), 394 deletions(-) create mode 100644 lib/src/web/database/connection/drift_sqlite_connection.dart delete mode 100644 lib/src/web/database/executor/drift_sql_executor.dart delete mode 100644 lib/src/web/database/executor/sqlite_executor.dart delete mode 100644 lib/src/web/database/web_db_context.dart delete mode 100644 lib/src/web/database/web_sqlite_connection_impl.dart diff --git a/lib/src/common/abstract_open_factory.dart b/lib/src/common/abstract_open_factory.dart index e3ffda0..760efb1 100644 --- a/lib/src/common/abstract_open_factory.dart +++ b/lib/src/common/abstract_open_factory.dart @@ -2,7 +2,11 @@ import 'dart:async'; import 'package:meta/meta.dart'; import 'package:sqlite_async/sqlite3_common.dart' as sqlite; +import 'package:sqlite_async/src/common/mutex.dart'; +import 'package:sqlite_async/src/common/port_channel.dart'; +import 'package:sqlite_async/src/sqlite_connection.dart'; import 'package:sqlite_async/src/sqlite_options.dart'; +import 'package:sqlite_async/src/update_notification.dart'; /// Factory to create new SQLite database connections. /// @@ -11,7 +15,11 @@ import 'package:sqlite_async/src/sqlite_options.dart'; abstract class SqliteOpenFactory { String get path; + /// Opens a direct connection to the SQLite database FutureOr open(SqliteOpenOptions options); + + /// Opens an asynchronous [SqliteConnection] + FutureOr openConnection(SqliteOpenOptions options); } class SqliteOpenOptions { @@ -21,8 +29,24 @@ class SqliteOpenOptions { /// Whether this connection is read-only. final bool readOnly; + /// Mutex to use in [SqliteConnection]s + final Mutex? mutex; + + /// Name used in debug logs + final String? debugName; + + final SerializedPortClient? upstreamPort; + + /// Stream of external update notifications + final Stream? updates; + const SqliteOpenOptions( - {required this.primaryConnection, required this.readOnly}); + {required this.primaryConnection, + required this.readOnly, + this.mutex, + this.debugName, + this.updates, + this.upstreamPort}); sqlite.OpenMode get openMode { if (primaryConnection) { @@ -55,9 +79,14 @@ abstract class AbstractDefaultSqliteOpenFactory< List pragmaStatements(SqliteOpenOptions options); @protected + + /// Opens a direct connection to a SQLite database connection FutureOr openDB(SqliteOpenOptions options); @override + + /// Opens a direct connection to a SQLite database connection + /// and executes setup pragma statements to initialize the DB FutureOr open(SqliteOpenOptions options) async { var db = await openDB(options); @@ -66,4 +95,10 @@ abstract class AbstractDefaultSqliteOpenFactory< } return db; } + + @override + + /// Opens an asynchronous [SqliteConnection] to a SQLite database + /// and executes setup pragma statements to initialize the DB + FutureOr openConnection(SqliteOpenOptions options); } diff --git a/lib/src/impl/stub_sqlite_open_factory.dart b/lib/src/impl/stub_sqlite_open_factory.dart index 437ff4b..1108752 100644 --- a/lib/src/impl/stub_sqlite_open_factory.dart +++ b/lib/src/impl/stub_sqlite_open_factory.dart @@ -1,5 +1,8 @@ +import 'dart:async'; + import 'package:sqlite_async/sqlite3_common.dart'; import 'package:sqlite_async/src/common/abstract_open_factory.dart'; +import 'package:sqlite_async/src/sqlite_connection.dart'; import 'package:sqlite_async/src/sqlite_options.dart'; class DefaultSqliteOpenFactory extends AbstractDefaultSqliteOpenFactory { @@ -16,4 +19,9 @@ class DefaultSqliteOpenFactory extends AbstractDefaultSqliteOpenFactory { List pragmaStatements(SqliteOpenOptions options) { throw UnimplementedError(); } + + @override + FutureOr openConnection(SqliteOpenOptions options) { + throw UnimplementedError(); + } } diff --git a/lib/src/native/database/connection_pool.dart b/lib/src/native/database/connection_pool.dart index 4f92f27..d46c702 100644 --- a/lib/src/native/database/connection_pool.dart +++ b/lib/src/native/database/connection_pool.dart @@ -117,21 +117,15 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { @override Future writeLock(Future Function(SqliteWriteContext tx) callback, - {Duration? lockTimeout, String? debugContext}) { + {Duration? lockTimeout, String? debugContext}) async { if (closed) { throw AssertionError('Closed'); } if (_writeConnection?.closed == true) { _writeConnection = null; } - _writeConnection ??= SqliteConnectionImpl( - upstreamPort: _upstreamPort, - primary: false, - updates: updates, - debugName: debugName != null ? '$debugName-writer' : null, - mutex: mutex, - readOnly: false, - openFactory: _factory); + _writeConnection ??= await _openPrimaryConnection( + debugName: debugName != null ? '$debugName-writer' : null); return _runZoned(() { return _writeConnection!.writeLock(callback, lockTimeout: lockTimeout, debugContext: debugContext); @@ -192,4 +186,14 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { // read-only connections first. await _writeConnection?.close(); } + + FutureOr _openPrimaryConnection({String? debugName}) { + return _factory.openConnection(SqliteOpenOptions( + upstreamPort: _upstreamPort, + primaryConnection: true, + updates: updates, + debugName: debugName, + mutex: mutex, + readOnly: false)); + } } diff --git a/lib/src/native/database/native_sqlite_database.dart b/lib/src/native/database/native_sqlite_database.dart index fd258d2..a516c27 100644 --- a/lib/src/native/database/native_sqlite_database.dart +++ b/lib/src/native/database/native_sqlite_database.dart @@ -5,7 +5,6 @@ import 'package:sqlite_async/src/common/abstract_open_factory.dart'; import 'package:sqlite_async/src/common/sqlite_database.dart'; import 'package:sqlite_async/src/common/port_channel.dart'; import 'package:sqlite_async/src/native/database/connection_pool.dart'; -import 'package:sqlite_async/src/native/database/native_sqlite_connection_impl.dart'; import 'package:sqlite_async/src/native/native_isolate_connection_factory.dart'; import 'package:sqlite_async/src/native/native_isolate_mutex.dart'; import 'package:sqlite_async/src/native/native_sqlite_open_factory.dart'; @@ -33,11 +32,11 @@ class SqliteDatabaseImpl int maxReaders; @override - late Future isInitialized; + // Native doesn't require any asynchronous initialization + late Future isInitialized = Future.value(); late final PortServer _eventsPort; - late final SqliteConnectionImpl _internalConnection; late final SqliteConnectionPool _pool; /// Global lock to serialize write transactions. @@ -77,20 +76,12 @@ class SqliteDatabaseImpl _listenForEvents(); - _internalConnection = _openPrimaryConnection(debugName: 'sqlite-writer'); _pool = SqliteConnectionPool(openFactory, upstreamPort: _eventsPort.client(), updates: updates, - writeConnection: _internalConnection, debugName: 'sqlite', maxReaders: maxReaders, mutex: mutex); - - isInitialized = _init(); - } - - Future _init() async { - await _internalConnection.ready; } @override @@ -160,17 +151,6 @@ class SqliteDatabaseImpl upstreamPort: _eventsPort.client()); } - SqliteConnectionImpl _openPrimaryConnection({String? debugName}) { - return SqliteConnectionImpl( - upstreamPort: _eventsPort.client(), - primary: true, - updates: updates, - debugName: debugName, - mutex: mutex, - readOnly: false, - openFactory: openFactory); - } - @override Future close() async { await _pool.close(); diff --git a/lib/src/native/native_sqlite_open_factory.dart b/lib/src/native/native_sqlite_open_factory.dart index c243bd0..c350c3d 100644 --- a/lib/src/native/native_sqlite_open_factory.dart +++ b/lib/src/native/native_sqlite_open_factory.dart @@ -2,6 +2,8 @@ import 'package:sqlite3/sqlite3.dart'; import 'package:sqlite_async/sqlite3_common.dart'; import 'package:sqlite_async/src/common/abstract_open_factory.dart'; +import 'package:sqlite_async/src/native/database/native_sqlite_connection_impl.dart'; +import 'package:sqlite_async/src/sqlite_connection.dart'; import 'package:sqlite_async/src/sqlite_options.dart'; /// Native implementation of [AbstractDefaultSqliteOpenFactory] @@ -37,4 +39,17 @@ class DefaultSqliteOpenFactory extends AbstractDefaultSqliteOpenFactory { } return statements; } + + @override + SqliteConnection openConnection(SqliteOpenOptions options) { + return SqliteConnectionImpl( + primary: options.primaryConnection, + readOnly: options.readOnly, + mutex: options.mutex!, + upstreamPort: options.upstreamPort!, + debugName: options.debugName, + updates: options.updates, + openFactory: this, + ); + } } diff --git a/lib/src/sqlite_connection.dart b/lib/src/sqlite_connection.dart index 1ace5b8..2ff570e 100644 --- a/lib/src/sqlite_connection.dart +++ b/lib/src/sqlite_connection.dart @@ -1,6 +1,7 @@ import 'dart:async'; import 'package:sqlite3/common.dart' as sqlite; +import 'package:sqlite_async/src/update_notification.dart'; /// Abstract class representing calls available in a read-only or read-write context. abstract class SqliteReadContext { @@ -74,6 +75,9 @@ abstract class SqliteWriteContext extends SqliteReadContext { /// Abstract class representing a connection to the SQLite database. abstract class SqliteConnection extends SqliteWriteContext { + /// Reports table change update notification + Stream? get updates; + /// Open a read-only transaction. /// /// Statements within the transaction must be done on the provided diff --git a/lib/src/web/database/connection/drift_sqlite_connection.dart b/lib/src/web/database/connection/drift_sqlite_connection.dart new file mode 100644 index 0000000..2ee7f0b --- /dev/null +++ b/lib/src/web/database/connection/drift_sqlite_connection.dart @@ -0,0 +1,259 @@ +import 'dart:async'; +import 'package:drift/drift.dart'; +import 'package:drift/remote.dart'; +import 'package:drift/wasm.dart'; +import 'package:meta/meta.dart'; +import 'package:sqlite_async/sqlite3_common.dart'; + +import 'package:sqlite_async/src/common/mutex.dart'; + +import 'package:sqlite_async/src/sqlite_connection.dart'; +import 'package:sqlite_async/src/sqlite_queries.dart'; +import 'package:sqlite_async/src/update_notification.dart'; +import 'package:sqlite_async/src/utils/shared_utils.dart'; + +/// Custom function which exposes CommonDatabase.autocommit +const sqliteAsyncAutoCommitCommand = 'sqlite_async_autocommit'; + +class DriftSqliteConnection with SqliteQueries implements SqliteConnection { + WasmDatabaseResult db; + + @override + late Stream updates; + + final Mutex mutex; + + @override + bool closed = false; + + DriftSqliteConnection(this.db, this.mutex) { + // Pass on table updates + updates = db.resolvedExecutor.streamQueries + .updatesForSync(TableUpdateQuery.any()) + .map((tables) { + return UpdateNotification(tables.map((e) => e.table).toSet()); + }); + } + + @override + close() { + closed = true; + return db.resolvedExecutor.close(); + } + + @override + Future executeBatch( + String sql, List> parameterSets) async { + try { + final result = await db.resolvedExecutor.runBatched(BatchedStatements( + [sql], + parameterSets + .map((e) => ArgumentsForBatchedStatement(0, e)) + .toList())); + return result; + } on DriftRemoteException catch (e) { + if (e.toString().contains('SqliteException')) { + // Drift wraps these in remote errors + throw SqliteException(e.remoteCause.hashCode, e.remoteCause.toString()); + } + rethrow; + } + } + + Future select(String sql, + [List parameters = const []]) async { + try { + final result = await db.resolvedExecutor.runSelect(sql, parameters); + if (result.isEmpty) { + return ResultSet([], [], []); + } + return ResultSet(result.first.keys.toList(), [], + result.map((e) => e.values.toList()).toList()); + } on DriftRemoteException catch (e) { + if (e.toString().contains('SqliteException')) { + // Drift wraps these in remote errors + throw SqliteException(e.remoteCause.hashCode, e.remoteCause.toString()); + } + rethrow; + } + } + + @override + Future readLock(Future Function(SqliteReadContext tx) callback, + {Duration? lockTimeout, + String? debugContext, + bool isTransaction = false}) async { + return _runZoned( + () => mutex.lock(() async { + final context = + DriftReadContext(this, isTransaction: isTransaction); + try { + final result = await callback(context); + return result; + } finally { + context.close(); + } + }, timeout: lockTimeout), + debugContext: debugContext ?? 'execute()'); + } + + @override + Future writeLock(Future Function(SqliteWriteContext tx) callback, + {Duration? lockTimeout, + String? debugContext, + bool isTransaction = false}) async { + return _runZoned( + () => mutex.lock(() async { + final context = + DriftWriteContext(this, isTransaction: isTransaction); + try { + final result = await callback(context); + return result; + } finally { + context.close(); + } + }, timeout: lockTimeout), + debugContext: debugContext ?? 'execute()'); + } + + @override + Future readTransaction( + Future Function(SqliteReadContext tx) callback, + {Duration? lockTimeout}) async { + return readLock((ctx) async { + return await internalReadTransaction(ctx, callback); + }, + lockTimeout: lockTimeout, + debugContext: 'readTransaction()', + isTransaction: true); + } + + @override + Future writeTransaction( + Future Function(SqliteWriteContext tx) callback, + {Duration? lockTimeout}) async { + return writeLock(( + ctx, + ) async { + return await internalWriteTransaction(ctx, callback); + }, + lockTimeout: lockTimeout, + debugContext: 'writeTransaction()', + isTransaction: true); + } + + /// The mutex on individual connections do already error in recursive locks. + /// + /// We duplicate the same check here, to: + /// 1. Also error when the recursive transaction is handled by a different + /// connection (with a different lock). + /// 2. Give a more specific error message when it happens. + T _runZoned(T Function() callback, {required String debugContext}) { + if (Zone.current[this] != null) { + throw LockError( + 'Recursive lock is not allowed. Use `tx.$debugContext` instead of `db.$debugContext`.'); + } + var zone = Zone.current.fork(zoneValues: {this: true}); + return zone.run(callback); + } + + @override + Future getAutoCommit() async { + return DriftWriteContext(this).getAutoCommit(); + } +} + +class DriftReadContext implements SqliteReadContext { + DriftSqliteConnection db; + bool _closed = false; + + @protected + bool isTransaction; + + DriftReadContext(this.db, {this.isTransaction = false}); + + @override + Future computeWithDatabase( + Future Function(CommonDatabase db) compute) { + throw UnimplementedError(); + } + + @override + Future get(String sql, [List parameters = const []]) async { + return (await getAll(sql, parameters)).first; + } + + @override + Future getAll(String sql, + [List parameters = const []]) async { + if (_closed) { + throw SqliteException(0, 'Transaction closed', null, sql); + } + return db.select(sql, parameters); + } + + @override + Future getOptional(String sql, + [List parameters = const []]) async { + return (await db.select(sql, parameters)).firstOrNull; + } + + @override + bool get closed => _closed; + + close() { + _closed = true; + } + + @override + Future getAutoCommit() async { + final response = await db.select('select $sqliteAsyncAutoCommitCommand()'); + if (response.isEmpty) { + return false; + } + + return response.first.values.first != 0; + } +} + +class DriftWriteContext extends DriftReadContext implements SqliteWriteContext { + DriftWriteContext(super.db, {super.isTransaction}); + + @override + Future execute(String sql, + [List parameters = const []]) async { + return getAll(sql, parameters); + } + + @override + Future getAll(String sql, + [List parameters = const []]) async { + if (_closed) { + throw SqliteException(0, 'Transaction closed', null, sql); + } + + /// Statements in read/writeTransactions should not execute after ROLLBACK + if (isTransaction && + !sql.toLowerCase().contains('begin') && + await getAutoCommit()) { + throw SqliteException(0, + 'Transaction rolled back by earlier statement. Cannot execute: $sql'); + } + return db.select(sql, parameters); + } + + @override + Future executeBatch( + String sql, List> parameterSets) async { + return db.executeBatch(sql, parameterSets); + } +} + +class DriftSqliteUser extends QueryExecutorUser { + @override + Future beforeOpen( + QueryExecutor executor, OpeningDetails details) async {} + + @override + int get schemaVersion => 1; +} diff --git a/lib/src/web/database/executor/drift_sql_executor.dart b/lib/src/web/database/executor/drift_sql_executor.dart deleted file mode 100644 index 8911e41..0000000 --- a/lib/src/web/database/executor/drift_sql_executor.dart +++ /dev/null @@ -1,76 +0,0 @@ -import 'dart:async'; - -import 'package:drift/drift.dart'; -import 'package:drift/remote.dart'; -import 'package:drift/wasm.dart'; -import 'package:sqlite3/common.dart'; -import 'sqlite_executor.dart'; - -class DriftWebSQLExecutor extends SQLExecutor { - WasmDatabaseResult db; - - @override - bool closed = false; - - DriftWebSQLExecutor(this.db) { - // Pass on table updates - updateStream = db.resolvedExecutor.streamQueries - .updatesForSync(TableUpdateQuery.any()) - .map((tables) { - return tables.map((e) => e.table).toSet(); - }); - } - - @override - close() { - closed = true; - return db.resolvedExecutor.close(); - } - - @override - Future executeBatch( - String sql, List> parameterSets) async { - try { - final result = await db.resolvedExecutor.runBatched(BatchedStatements( - [sql], - parameterSets - .map((e) => ArgumentsForBatchedStatement(0, e)) - .toList())); - return result; - } on DriftRemoteException catch (e) { - if (e.toString().contains('SqliteException')) { - // Drift wraps these in remote errors - throw SqliteException(e.remoteCause.hashCode, e.remoteCause.toString()); - } - rethrow; - } - } - - @override - Future select(String sql, - [List parameters = const []]) async { - try { - final result = await db.resolvedExecutor.runSelect(sql, parameters); - if (result.isEmpty) { - return ResultSet([], [], []); - } - return ResultSet(result.first.keys.toList(), [], - result.map((e) => e.values.toList()).toList()); - } on DriftRemoteException catch (e) { - if (e.toString().contains('SqliteException')) { - // Drift wraps these in remote errors - throw SqliteException(e.remoteCause.hashCode, e.remoteCause.toString()); - } - rethrow; - } - } -} - -class DriftSqliteUser extends QueryExecutorUser { - @override - Future beforeOpen( - QueryExecutor executor, OpeningDetails details) async {} - - @override - int get schemaVersion => 1; -} diff --git a/lib/src/web/database/executor/sqlite_executor.dart b/lib/src/web/database/executor/sqlite_executor.dart deleted file mode 100644 index c6192fa..0000000 --- a/lib/src/web/database/executor/sqlite_executor.dart +++ /dev/null @@ -1,19 +0,0 @@ -// Abstract class which provides base methods required for Context providers -import 'dart:async'; - -import 'package:sqlite_async/sqlite3_common.dart'; - -/// Abstract class for providing basic SQLite operations -/// Specific DB implementations such as Drift can be adapted to -/// this interface -abstract class SQLExecutor { - bool get closed; - - Stream> updateStream = Stream.empty(); - - Future close(); - - FutureOr select(String sql, [List parameters = const []]); - - FutureOr executeBatch(String sql, List> parameterSets) {} -} diff --git a/lib/src/web/database/web_db_context.dart b/lib/src/web/database/web_db_context.dart deleted file mode 100644 index 1c6b47f..0000000 --- a/lib/src/web/database/web_db_context.dart +++ /dev/null @@ -1,96 +0,0 @@ -import 'dart:async'; - -import 'package:meta/meta.dart'; -import 'package:sqlite_async/sqlite3_common.dart'; -import 'package:sqlite_async/src/sqlite_connection.dart'; -import 'executor/sqlite_executor.dart'; - -/// Custom function which exposes CommonDatabase.autocommit -const sqliteAsyncAutoCommitCommand = 'sqlite_async_autocommit'; - -class WebReadContext implements SqliteReadContext { - SQLExecutor db; - bool _closed = false; - - @protected - bool isTransaction; - - WebReadContext(this.db, {this.isTransaction = false}); - - @override - Future computeWithDatabase( - Future Function(CommonDatabase db) compute) { - throw UnimplementedError(); - } - - @override - Future get(String sql, [List parameters = const []]) async { - return (await getAll(sql, parameters)).first; - } - - @override - Future getAll(String sql, - [List parameters = const []]) async { - if (_closed) { - throw SqliteException(0, 'Transaction closed', null, sql); - } - return db.select(sql, parameters); - } - - @override - Future getOptional(String sql, - [List parameters = const []]) async { - final rows = await getAll(sql, parameters); - return rows.isEmpty ? null : rows.first; - } - - @override - bool get closed => _closed; - - close() { - _closed = true; - } - - @override - Future getAutoCommit() async { - final response = await db.select('select $sqliteAsyncAutoCommitCommand()'); - if (response.isEmpty) { - return false; - } - - return response.first.values.first != 0; - } -} - -class WebWriteContext extends WebReadContext implements SqliteWriteContext { - WebWriteContext(super.db, {super.isTransaction}); - - @override - Future execute(String sql, - [List parameters = const []]) async { - return getAll(sql, parameters); - } - - @override - Future getAll(String sql, - [List parameters = const []]) async { - if (_closed) { - throw SqliteException(0, 'Transaction closed', null, sql); - } - - /// Statements in read/writeTransactions should not execute after ROLLBACK - if (isTransaction && - !sql.toLowerCase().contains('begin') && - await getAutoCommit()) { - throw SqliteException(0, - 'Transaction rolled back by earlier statement. Cannot execute: $sql'); - } - return db.select(sql, parameters); - } - - @override - Future executeBatch( - String sql, List> parameterSets) async { - return db.executeBatch(sql, parameterSets); - } -} diff --git a/lib/src/web/database/web_sqlite_connection_impl.dart b/lib/src/web/database/web_sqlite_connection_impl.dart deleted file mode 100644 index a9df6b2..0000000 --- a/lib/src/web/database/web_sqlite_connection_impl.dart +++ /dev/null @@ -1,144 +0,0 @@ -import 'dart:async'; -import 'package:meta/meta.dart'; -import 'package:sqlite_async/src/common/abstract_open_factory.dart'; -import 'package:sqlite_async/src/common/mutex.dart'; - -import 'package:sqlite_async/src/sqlite_connection.dart'; -import 'package:sqlite_async/src/sqlite_queries.dart'; -import 'package:sqlite_async/src/update_notification.dart'; -import 'package:sqlite_async/src/utils/shared_utils.dart'; -import 'package:sqlite_async/src/web/web_sqlite_open_factory.dart'; - -import 'executor/sqlite_executor.dart'; -import 'web_db_context.dart'; - -/// Web implementation of [SqliteConnection] -class WebSqliteConnectionImpl with SqliteQueries implements SqliteConnection { - @override - bool get closed { - return executor == null || executor!.closed; - } - - @override - late Stream updates; - - late final Mutex mutex; - DefaultSqliteOpenFactory openFactory; - - @protected - final StreamController updatesController = - StreamController.broadcast(); - - @protected - late SQLExecutor? executor; - - @protected - late Future isInitialized; - - WebSqliteConnectionImpl({required this.openFactory, required this.mutex}) { - updates = updatesController.stream; - isInitialized = _init(); - } - - Future _init() async { - executor = await openFactory.openExecutor( - SqliteOpenOptions(primaryConnection: true, readOnly: false)); - - executor!.updateStream.forEach((tables) { - updatesController.add(UpdateNotification(tables)); - }); - } - - @override - Future readLock(Future Function(SqliteReadContext tx) callback, - {Duration? lockTimeout, - String? debugContext, - bool isTransaction = false}) async { - await isInitialized; - return _runZoned( - () => mutex.lock(() async { - final context = - WebReadContext(executor!, isTransaction: isTransaction); - try { - final result = await callback(context); - return result; - } finally { - context.close(); - } - }, timeout: lockTimeout), - debugContext: debugContext ?? 'execute()'); - } - - @override - Future writeLock(Future Function(SqliteWriteContext tx) callback, - {Duration? lockTimeout, - String? debugContext, - bool isTransaction = false}) async { - await isInitialized; - return _runZoned( - () => mutex.lock(() async { - final context = - WebWriteContext(executor!, isTransaction: isTransaction); - try { - final result = await callback(context); - return result; - } finally { - context.close(); - } - }, timeout: lockTimeout), - debugContext: debugContext ?? 'execute()'); - } - - @override - Future readTransaction( - Future Function(SqliteReadContext tx) callback, - {Duration? lockTimeout}) async { - return readLock((ctx) async { - return await internalReadTransaction(ctx, callback); - }, - lockTimeout: lockTimeout, - debugContext: 'readTransaction()', - isTransaction: true); - } - - @override - Future writeTransaction( - Future Function(SqliteWriteContext tx) callback, - {Duration? lockTimeout}) async { - return writeLock(( - ctx, - ) async { - return await internalWriteTransaction(ctx, callback); - }, - lockTimeout: lockTimeout, - debugContext: 'writeTransaction()', - isTransaction: true); - } - - /// The mutex on individual connections do already error in recursive locks. - /// - /// We duplicate the same check here, to: - /// 1. Also error when the recursive transaction is handled by a different - /// connection (with a different lock). - /// 2. Give a more specific error message when it happens. - T _runZoned(T Function() callback, {required String debugContext}) { - if (Zone.current[this] != null) { - throw LockError( - 'Recursive lock is not allowed. Use `tx.$debugContext` instead of `db.$debugContext`.'); - } - var zone = Zone.current.fork(zoneValues: {this: true}); - return zone.run(callback); - } - - @override - Future close() async { - await isInitialized; - await executor!.close(); - } - - @override - Future getAutoCommit() async { - await isInitialized; - return WebWriteContext(executor!).getAutoCommit(); - } -} diff --git a/lib/src/web/database/web_sqlite_database.dart b/lib/src/web/database/web_sqlite_database.dart index 62e3911..3a16365 100644 --- a/lib/src/web/database/web_sqlite_database.dart +++ b/lib/src/web/database/web_sqlite_database.dart @@ -10,8 +10,6 @@ import 'package:sqlite_async/src/update_notification.dart'; import 'package:sqlite_async/src/web/web_mutex.dart'; import 'package:sqlite_async/src/web/web_sqlite_open_factory.dart'; -import 'web_sqlite_connection_impl.dart'; - /// Web implementation of [SqliteDatabase] /// Uses a web worker for SQLite connection class SqliteDatabaseImpl @@ -35,8 +33,7 @@ class SqliteDatabaseImpl AbstractDefaultSqliteOpenFactory openFactory; late final Mutex mutex; - late final IsolateConnectionFactoryImpl _isolateConnectionFactory; - late final WebSqliteConnectionImpl _connection; + late final SqliteConnection _connection; /// Open a SqliteDatabase. /// @@ -70,14 +67,13 @@ class SqliteDatabaseImpl {this.maxReaders = SqliteDatabase.defaultMaxReaders}) { updates = updatesController.stream; mutex = MutexImpl(); - _isolateConnectionFactory = IsolateConnectionFactoryImpl( - openFactory: openFactory as DefaultSqliteOpenFactory, mutex: mutex); - _connection = _isolateConnectionFactory.open(); isInitialized = _init(); } Future _init() async { - _connection.updates.forEach((update) { + _connection = await openFactory.openConnection(SqliteOpenOptions( + primaryConnection: true, readOnly: false, mutex: mutex)); + _connection.updates?.forEach((update) { updatesController.add(update); }); } @@ -119,7 +115,7 @@ class SqliteDatabaseImpl @override IsolateConnectionFactoryImpl isolateConnectionFactory() { - return _isolateConnectionFactory; + throw UnimplementedError(); } @override diff --git a/lib/src/web/web_isolate_connection_factory.dart b/lib/src/web/web_isolate_connection_factory.dart index 68212c2..e1f46ae 100644 --- a/lib/src/web/web_isolate_connection_factory.dart +++ b/lib/src/web/web_isolate_connection_factory.dart @@ -1,12 +1,11 @@ import 'dart:async'; import 'package:sqlite_async/sqlite3_common.dart'; -import 'package:sqlite_async/src/common/abstract_open_factory.dart'; import 'package:sqlite_async/src/common/isolate_connection_factory.dart'; import 'package:sqlite_async/src/common/mutex.dart'; import 'package:sqlite_async/src/common/port_channel.dart'; +import 'package:sqlite_async/src/sqlite_connection.dart'; import 'package:sqlite_async/src/web/web_sqlite_open_factory.dart'; -import 'database/web_sqlite_connection_impl.dart'; /// An implementation of [IsolateConnectionFactory] for Web /// This uses a web worker instead of an isolate @@ -28,8 +27,8 @@ class IsolateConnectionFactoryImpl /// /// This opens a single connection in a background execution isolate. @override - WebSqliteConnectionImpl open({String? debugName, bool readOnly = false}) { - return WebSqliteConnectionImpl(mutex: mutex, openFactory: openFactory); + SqliteConnection open({String? debugName, bool readOnly = false}) { + throw UnimplementedError(); } /// Opens a synchronous sqlite.Database directly in the current isolate. @@ -43,8 +42,7 @@ class IsolateConnectionFactoryImpl /// this connection. @override Future openRawDatabase({bool readOnly = false}) async { - return openFactory - .open(SqliteOpenOptions(primaryConnection: false, readOnly: readOnly)); + throw UnimplementedError(); } @override diff --git a/lib/src/web/web_sqlite_open_factory.dart b/lib/src/web/web_sqlite_open_factory.dart index 916d275..6f6ccb3 100644 --- a/lib/src/web/web_sqlite_open_factory.dart +++ b/lib/src/web/web_sqlite_open_factory.dart @@ -2,12 +2,9 @@ import 'dart:async'; import 'package:drift/wasm.dart'; import 'package:sqlite3/wasm.dart'; - -import 'package:sqlite_async/src/common/abstract_open_factory.dart'; -import 'package:sqlite_async/src/sqlite_options.dart'; - -import 'database/executor/drift_sql_executor.dart'; -import 'database/executor/sqlite_executor.dart'; +import 'package:sqlite_async/sqlite_async.dart'; +import 'package:sqlite_async/src/web/database/connection/drift_sqlite_connection.dart'; +import 'package:sqlite_async/src/web/web_mutex.dart'; /// Web implementation of [AbstractDefaultSqliteOpenFactory] class DefaultSqliteOpenFactory @@ -33,23 +30,27 @@ class DefaultSqliteOpenFactory return wasmSqlite.open(path); } - /// Returns a simple asynchronous SQLExecutor which can be used to implement - /// higher order functionality. + @override + /// Currently this only uses the Drift WASM implementation. /// The Drift SQLite package provides built in async Web worker functionality /// and automatic persistence storage selection. /// Due to being asynchronous, the under laying CommonDatabase is not accessible - Future openExecutor(SqliteOpenOptions options) async { + Future openConnection(SqliteOpenOptions options) async { final db = await WasmDatabase.open( databaseName: path, sqlite3Uri: Uri.parse(sqliteOptions.webSqliteOptions.wasmUri), driftWorkerUri: Uri.parse(sqliteOptions.webSqliteOptions.workerUri), ); - final executor = DriftWebSQLExecutor(db); await db.resolvedExecutor.ensureOpen(DriftSqliteUser()); - return executor; + final connection = DriftSqliteConnection(db, options.mutex ?? MutexImpl()); + // funnel table updates through the upstreamPort + connection.updates.forEach((element) { + options.upstreamPort?.sendPort.send(element); + }); + return connection; } @override diff --git a/lib/src/web/worker/worker_utils.dart b/lib/src/web/worker/worker_utils.dart index cf5b611..46c276b 100644 --- a/lib/src/web/worker/worker_utils.dart +++ b/lib/src/web/worker/worker_utils.dart @@ -1,5 +1,5 @@ import 'package:sqlite_async/sqlite3_common.dart'; -import 'package:sqlite_async/src/web/database/web_db_context.dart'; +import 'package:sqlite_async/src/web/database/connection/drift_sqlite_connection.dart'; void setupCommonWorkerDB(CommonDatabase database) { /// Exposes autocommit via a query function From 9755051391731c7ef10f7bb87f0db8fc6df4a88f Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Tue, 20 Feb 2024 16:59:51 +0200 Subject: [PATCH 03/10] fix race condition for write connections in native SqliteConnectionPool --- lib/src/native/database/connection_pool.dart | 20 ++++++++++++++++++-- lib/src/sqlite_connection.dart | 2 +- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/lib/src/native/database/connection_pool.dart b/lib/src/native/database/connection_pool.dart index d46c702..ce5243a 100644 --- a/lib/src/native/database/connection_pool.dart +++ b/lib/src/native/database/connection_pool.dart @@ -124,8 +124,24 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { if (_writeConnection?.closed == true) { _writeConnection = null; } - _writeConnection ??= await _openPrimaryConnection( - debugName: debugName != null ? '$debugName-writer' : null); + + /// Prevent possible race condition where multiple writeLock requests + /// could potentially open multiple writeConnections + if (_writeConnection == null) { + await mutex.lock(() async { + if (_writeConnection != null) { + return; + } + _writeConnection ??= await _factory.openConnection(SqliteOpenOptions( + upstreamPort: _upstreamPort, + primaryConnection: false, + updates: updates, + debugName: debugName != null ? '$debugName-writer' : null, + mutex: mutex, + readOnly: false)); + }); + } + return _runZoned(() { return _writeConnection!.writeLock(callback, lockTimeout: lockTimeout, debugContext: debugContext); diff --git a/lib/src/sqlite_connection.dart b/lib/src/sqlite_connection.dart index 2ff570e..b9edd36 100644 --- a/lib/src/sqlite_connection.dart +++ b/lib/src/sqlite_connection.dart @@ -75,7 +75,7 @@ abstract class SqliteWriteContext extends SqliteReadContext { /// Abstract class representing a connection to the SQLite database. abstract class SqliteConnection extends SqliteWriteContext { - /// Reports table change update notification + /// Reports table change update notifications Stream? get updates; /// Open a read-only transaction. From e6ae7c26ff963e71cf94b231d27d1d61e8b2600f Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Wed, 21 Feb 2024 11:58:10 +0200 Subject: [PATCH 04/10] wip remove upstreamport from sqliteoptions --- lib/src/common/abstract_open_factory.dart | 6 +----- lib/src/native/database/connection_pool.dart | 2 -- lib/src/native/native_sqlite_open_factory.dart | 1 - lib/src/web/web_sqlite_open_factory.dart | 7 +------ 4 files changed, 2 insertions(+), 14 deletions(-) diff --git a/lib/src/common/abstract_open_factory.dart b/lib/src/common/abstract_open_factory.dart index 760efb1..82534b9 100644 --- a/lib/src/common/abstract_open_factory.dart +++ b/lib/src/common/abstract_open_factory.dart @@ -3,7 +3,6 @@ import 'package:meta/meta.dart'; import 'package:sqlite_async/sqlite3_common.dart' as sqlite; import 'package:sqlite_async/src/common/mutex.dart'; -import 'package:sqlite_async/src/common/port_channel.dart'; import 'package:sqlite_async/src/sqlite_connection.dart'; import 'package:sqlite_async/src/sqlite_options.dart'; import 'package:sqlite_async/src/update_notification.dart'; @@ -35,8 +34,6 @@ class SqliteOpenOptions { /// Name used in debug logs final String? debugName; - final SerializedPortClient? upstreamPort; - /// Stream of external update notifications final Stream? updates; @@ -45,8 +42,7 @@ class SqliteOpenOptions { required this.readOnly, this.mutex, this.debugName, - this.updates, - this.upstreamPort}); + this.updates}); sqlite.OpenMode get openMode { if (primaryConnection) { diff --git a/lib/src/native/database/connection_pool.dart b/lib/src/native/database/connection_pool.dart index ce5243a..3cc7df4 100644 --- a/lib/src/native/database/connection_pool.dart +++ b/lib/src/native/database/connection_pool.dart @@ -133,7 +133,6 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { return; } _writeConnection ??= await _factory.openConnection(SqliteOpenOptions( - upstreamPort: _upstreamPort, primaryConnection: false, updates: updates, debugName: debugName != null ? '$debugName-writer' : null, @@ -205,7 +204,6 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { FutureOr _openPrimaryConnection({String? debugName}) { return _factory.openConnection(SqliteOpenOptions( - upstreamPort: _upstreamPort, primaryConnection: true, updates: updates, debugName: debugName, diff --git a/lib/src/native/native_sqlite_open_factory.dart b/lib/src/native/native_sqlite_open_factory.dart index c350c3d..99fd8e4 100644 --- a/lib/src/native/native_sqlite_open_factory.dart +++ b/lib/src/native/native_sqlite_open_factory.dart @@ -46,7 +46,6 @@ class DefaultSqliteOpenFactory extends AbstractDefaultSqliteOpenFactory { primary: options.primaryConnection, readOnly: options.readOnly, mutex: options.mutex!, - upstreamPort: options.upstreamPort!, debugName: options.debugName, updates: options.updates, openFactory: this, diff --git a/lib/src/web/web_sqlite_open_factory.dart b/lib/src/web/web_sqlite_open_factory.dart index 6f6ccb3..8bb7f4e 100644 --- a/lib/src/web/web_sqlite_open_factory.dart +++ b/lib/src/web/web_sqlite_open_factory.dart @@ -45,12 +45,7 @@ class DefaultSqliteOpenFactory await db.resolvedExecutor.ensureOpen(DriftSqliteUser()); - final connection = DriftSqliteConnection(db, options.mutex ?? MutexImpl()); - // funnel table updates through the upstreamPort - connection.updates.forEach((element) { - options.upstreamPort?.sendPort.send(element); - }); - return connection; + return DriftSqliteConnection(db, options.mutex ?? MutexImpl()); } @override From 90aba60e666c0ec6c2efbd3e1b4ba397e32cecef Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Fri, 23 Feb 2024 11:43:25 +0200 Subject: [PATCH 05/10] remove upstreamPort from SqliteOptions --- .../common/isolate_connection_factory.dart | 7 +- lib/src/native/database/connection_pool.dart | 59 +++++++-------- .../native_sqlite_connection_impl.dart | 12 +++- .../database/native_sqlite_database.dart | 70 ++---------------- .../native_isolate_connection_factory.dart | 71 ++++++++++++++++--- lib/src/sqlite_queries.dart | 3 - 6 files changed, 106 insertions(+), 116 deletions(-) diff --git a/lib/src/common/isolate_connection_factory.dart b/lib/src/common/isolate_connection_factory.dart index 7a514d9..76b5665 100644 --- a/lib/src/common/isolate_connection_factory.dart +++ b/lib/src/common/isolate_connection_factory.dart @@ -34,10 +34,9 @@ abstract class IsolateConnectionFactory required mutex, SerializedPortClient? upstreamPort}) { return IsolateConnectionFactoryImpl( - openFactory: openFactory, - mutex: mutex, - upstreamPort: upstreamPort as SerializedPortClient) - as IsolateConnectionFactory; + openFactory: openFactory, + mutex: mutex, + upstreamPort: upstreamPort) as IsolateConnectionFactory; } /// Open a new SqliteConnection. diff --git a/lib/src/native/database/connection_pool.dart b/lib/src/native/database/connection_pool.dart index 3cc7df4..729c0a7 100644 --- a/lib/src/native/database/connection_pool.dart +++ b/lib/src/native/database/connection_pool.dart @@ -1,25 +1,22 @@ import 'dart:async'; -import 'package:sqlite_async/src/common/abstract_open_factory.dart'; -import 'package:sqlite_async/src/common/mutex.dart'; -import 'package:sqlite_async/src/common/port_channel.dart'; +import 'package:sqlite_async/sqlite_async.dart'; import 'package:sqlite_async/src/native/database/native_sqlite_connection_impl.dart'; import 'package:sqlite_async/src/native/native_isolate_mutex.dart'; -import 'package:sqlite_async/src/sqlite_connection.dart'; -import 'package:sqlite_async/src/sqlite_queries.dart'; -import 'package:sqlite_async/src/update_notification.dart'; /// A connection pool with a single write connection and multiple read connections. class SqliteConnectionPool with SqliteQueries implements SqliteConnection { - SqliteConnection? _writeConnection; + final StreamController updatesController = + StreamController.broadcast(); + + SqliteConnectionImpl? _writeConnection; final List _readConnections = []; final AbstractDefaultSqliteOpenFactory _factory; - final SerializedPortClient _upstreamPort; @override - final Stream? updates; + Stream? updates; final int maxReaders; @@ -41,14 +38,14 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { /// Read connections are opened in read-only mode, and will reject any statements /// that modify the database. SqliteConnectionPool(this._factory, - {this.updates, + {Stream? updatesStream, this.maxReaders = 5, - SqliteConnection? writeConnection, + SqliteConnectionImpl? writeConnection, this.debugName, - required this.mutex, - required SerializedPortClient upstreamPort}) - : _writeConnection = writeConnection, - _upstreamPort = upstreamPort; + required this.mutex}) + : _writeConnection = writeConnection { + updates = updatesStream ?? updatesController.stream; + } /// Returns true if the _write_ connection is currently in autocommit mode. @override @@ -132,12 +129,15 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { if (_writeConnection != null) { return; } - _writeConnection ??= await _factory.openConnection(SqliteOpenOptions( + _writeConnection ??= (await _factory.openConnection(SqliteOpenOptions( primaryConnection: false, - updates: updates, debugName: debugName != null ? '$debugName-writer' : null, mutex: mutex, - readOnly: false)); + readOnly: false))) as SqliteConnectionImpl; + + _writeConnection!.updates?.forEach((update) { + updatesController.add(update); + }); }); } @@ -171,15 +171,13 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { var name = debugName == null ? null : '$debugName-${_readConnections.length + 1}'; - var connection = SqliteConnectionImpl( - upstreamPort: _upstreamPort, - primary: false, + var connection = await _factory.openConnection(SqliteOpenOptions( + primaryConnection: false, updates: updates, debugName: name, mutex: mutex, - readOnly: true, - openFactory: _factory); - _readConnections.add(connection); + readOnly: true)); + _readConnections.add(connection as SqliteConnectionImpl); // Edge case: // If we don't await here, there is a chance that a different connection @@ -190,6 +188,10 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { } } + SerializedPortClient? get upstreamPort { + return _writeConnection?.upstreamPort; + } + @override Future close() async { closed = true; @@ -201,13 +203,4 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { // read-only connections first. await _writeConnection?.close(); } - - FutureOr _openPrimaryConnection({String? debugName}) { - return _factory.openConnection(SqliteOpenOptions( - primaryConnection: true, - updates: updates, - debugName: debugName, - mutex: mutex, - readOnly: false)); - } } diff --git a/lib/src/native/database/native_sqlite_connection_impl.dart b/lib/src/native/database/native_sqlite_connection_impl.dart index 9351330..4939149 100644 --- a/lib/src/native/database/native_sqlite_connection_impl.dart +++ b/lib/src/native/database/native_sqlite_connection_impl.dart @@ -6,6 +6,7 @@ import 'package:sqlite_async/sqlite3_common.dart'; import 'package:sqlite_async/src/common/abstract_open_factory.dart'; import 'package:sqlite_async/src/common/mutex.dart'; import 'package:sqlite_async/src/common/port_channel.dart'; +import 'package:sqlite_async/src/native/native_isolate_connection_factory.dart'; import 'package:sqlite_async/src/native/native_isolate_mutex.dart'; import 'package:sqlite_async/src/sqlite_connection.dart'; import 'package:sqlite_async/src/sqlite_queries.dart'; @@ -16,14 +17,16 @@ typedef TxCallback = Future Function(CommonDatabase db); /// Implements a SqliteConnection using a separate isolate for the database /// operations. -class SqliteConnectionImpl with SqliteQueries implements SqliteConnection { +class SqliteConnectionImpl + with SqliteQueries, UpStreamTableUpdates + implements SqliteConnection { /// Private to this connection final SimpleMutex _connectionMutex = SimpleMutex(); final Mutex _writeMutex; /// Must be a broadcast stream @override - final Stream? updates; + Stream? updates; final ParentPortClient _isolateClient = ParentPortClient(); late final Isolate _isolate; final String? debugName; @@ -32,12 +35,14 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection { SqliteConnectionImpl( {required openFactory, required Mutex mutex, - required SerializedPortClient upstreamPort, + SerializedPortClient? port, this.updates, this.debugName, this.readOnly = false, bool primary = false}) : _writeMutex = mutex { + upstreamPort = port ?? listenForEvents(); + updates = updates ?? updatesController.stream; _open(openFactory, primary: primary, upstreamPort: upstreamPort); } @@ -88,6 +93,7 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection { @override Future close() async { + eventsPort?.close(); await _connectionMutex.lock(() async { if (readOnly) { await _isolateClient.post(const _SqliteIsolateConnectionClose()); diff --git a/lib/src/native/database/native_sqlite_database.dart b/lib/src/native/database/native_sqlite_database.dart index a516c27..77bff54 100644 --- a/lib/src/native/database/native_sqlite_database.dart +++ b/lib/src/native/database/native_sqlite_database.dart @@ -1,9 +1,7 @@ import 'dart:async'; -import 'dart:isolate'; import 'package:sqlite_async/src/common/abstract_open_factory.dart'; import 'package:sqlite_async/src/common/sqlite_database.dart'; -import 'package:sqlite_async/src/common/port_channel.dart'; import 'package:sqlite_async/src/native/database/connection_pool.dart'; import 'package:sqlite_async/src/native/native_isolate_connection_factory.dart'; import 'package:sqlite_async/src/native/native_isolate_mutex.dart'; @@ -12,8 +10,6 @@ import 'package:sqlite_async/src/sqlite_connection.dart'; import 'package:sqlite_async/src/sqlite_options.dart'; import 'package:sqlite_async/src/sqlite_queries.dart'; import 'package:sqlite_async/src/update_notification.dart'; -import 'package:sqlite_async/src/utils/native_database_utils.dart'; -import 'package:sqlite_async/src/utils/shared_utils.dart'; /// A SQLite database instance. /// @@ -23,7 +19,7 @@ class SqliteDatabaseImpl with SqliteQueries, SqliteDatabaseMixin implements SqliteDatabase { @override - final AbstractDefaultSqliteOpenFactory openFactory; + final DefaultSqliteOpenFactory openFactory; @override late Stream updates; @@ -35,8 +31,6 @@ class SqliteDatabaseImpl // Native doesn't require any asynchronous initialization late Future isInitialized = Future.value(); - late final PortServer _eventsPort; - late final SqliteConnectionPool _pool; /// Global lock to serialize write transactions. @@ -70,18 +64,11 @@ class SqliteDatabaseImpl /// 2. Running additional per-connection PRAGMA statements on each connection. /// 3. Creating custom SQLite functions. /// 4. Creating temporary views or triggers. - SqliteDatabaseImpl.withFactory(this.openFactory, - {this.maxReaders = SqliteDatabase.defaultMaxReaders}) { - updates = updatesController.stream; - - _listenForEvents(); - + SqliteDatabaseImpl.withFactory(AbstractDefaultSqliteOpenFactory factory, + {this.maxReaders = SqliteDatabase.defaultMaxReaders}) + : openFactory = factory as DefaultSqliteOpenFactory { _pool = SqliteConnectionPool(openFactory, - upstreamPort: _eventsPort.client(), - updates: updates, - debugName: 'sqlite', - maxReaders: maxReaders, - mutex: mutex); + debugName: 'sqlite', maxReaders: maxReaders, mutex: mutex); } @override @@ -96,50 +83,6 @@ class SqliteDatabaseImpl return _pool.getAutoCommit(); } - void _listenForEvents() { - UpdateNotification? updates; - - Map subscriptions = {}; - - _eventsPort = PortServer((message) async { - if (message is UpdateNotification) { - if (updates == null) { - updates = message; - // Use the mutex to only send updates after the current transaction. - // Do take care to avoid getting a lock for each individual update - - // that could add massive performance overhead. - mutex.lock(() async { - if (updates != null) { - updatesController.add(updates!); - updates = null; - } - }); - } else { - updates!.tables.addAll(message.tables); - } - return null; - } else if (message is InitDb) { - await isInitialized; - return null; - } else if (message is SubscribeToUpdates) { - if (subscriptions.containsKey(message.port)) { - return; - } - final subscription = updatesController.stream.listen((event) { - message.port.send(event); - }); - subscriptions[message.port] = subscription; - return null; - } else if (message is UnsubscribeToUpdates) { - final subscription = subscriptions.remove(message.port); - subscription?.cancel(); - return null; - } else { - throw ArgumentError('Unknown message type: $message'); - } - }); - } - /// A connection factory that can be passed to different isolates. /// /// Use this to access the database in background isolates. @@ -148,14 +91,13 @@ class SqliteDatabaseImpl return IsolateConnectionFactoryImpl( openFactory: openFactory, mutex: mutex.shared, - upstreamPort: _eventsPort.client()); + port: _pool.upstreamPort); } @override Future close() async { await _pool.close(); updatesController.close(); - _eventsPort.close(); await mutex.close(); } diff --git a/lib/src/native/native_isolate_connection_factory.dart b/lib/src/native/native_isolate_connection_factory.dart index d49044d..19e732d 100644 --- a/lib/src/native/native_isolate_connection_factory.dart +++ b/lib/src/native/native_isolate_connection_factory.dart @@ -1,32 +1,85 @@ import 'dart:async'; import 'dart:isolate'; +import 'package:meta/meta.dart'; import 'package:sqlite_async/src/common/isolate_connection_factory.dart'; -import 'package:sqlite_async/src/common/abstract_open_factory.dart'; import 'package:sqlite_async/src/common/port_channel.dart'; import 'package:sqlite_async/src/native/native_isolate_mutex.dart'; +import 'package:sqlite_async/src/native/native_sqlite_open_factory.dart'; import 'package:sqlite_async/src/sqlite_connection.dart'; import 'package:sqlite_async/src/update_notification.dart'; import 'package:sqlite_async/src/utils/database_utils.dart'; import 'database/native_sqlite_connection_impl.dart'; +mixin UpStreamTableUpdates { + final StreamController updatesController = + StreamController.broadcast(); + + late SerializedPortClient upstreamPort; + + @protected + PortServer? eventsPort; + + @protected + SerializedPortClient listenForEvents() { + UpdateNotification? updates; + + Map subscriptions = {}; + + eventsPort = PortServer((message) async { + if (message is UpdateNotification) { + if (updates == null) { + updates = message; + // Use the mutex to only send updates after the current transaction. + // Do take care to avoid getting a lock for each individual update - + // that could add massive performance overhead. + if (updates != null) { + updatesController.add(updates!); + updates = null; + } + } else { + updates!.tables.addAll(message.tables); + } + return null; + } else if (message is InitDb) { + return null; + } else if (message is SubscribeToUpdates) { + if (subscriptions.containsKey(message.port)) { + return; + } + final subscription = updatesController.stream.listen((event) { + message.port.send(event); + }); + subscriptions[message.port] = subscription; + return null; + } else if (message is UnsubscribeToUpdates) { + final subscription = subscriptions.remove(message.port); + subscription?.cancel(); + return null; + } else { + throw ArgumentError('Unknown message type: $message'); + } + }); + return upstreamPort = eventsPort!.client(); + } +} + /// A connection factory that can be passed to different isolates. class IsolateConnectionFactoryImpl - with IsolateOpenFactoryMixin + with IsolateOpenFactoryMixin, UpStreamTableUpdates implements IsolateConnectionFactory { @override - AbstractDefaultSqliteOpenFactory openFactory; + DefaultSqliteOpenFactory openFactory; @override SerializedMutex mutex; - @override - SerializedPortClient upstreamPort; - IsolateConnectionFactoryImpl( {required this.openFactory, required this.mutex, - required this.upstreamPort}); + SerializedPortClient? port}) { + upstreamPort = port ?? listenForEvents(); + } /// Open a new SqliteConnection. /// @@ -40,7 +93,7 @@ class IsolateConnectionFactoryImpl return _IsolateSqliteConnection( openFactory: openFactory, mutex: openMutex, - upstreamPort: upstreamPort, + port: upstreamPort, readOnly: readOnly, debugName: debugName, updates: updates.stream, @@ -88,7 +141,7 @@ class _IsolateSqliteConnection extends SqliteConnectionImpl { _IsolateSqliteConnection( {required super.openFactory, required super.mutex, - required super.upstreamPort, + super.port, super.updates, super.debugName, super.readOnly = false, diff --git a/lib/src/sqlite_queries.dart b/lib/src/sqlite_queries.dart index 959a97d..d0eab7a 100644 --- a/lib/src/sqlite_queries.dart +++ b/lib/src/sqlite_queries.dart @@ -9,9 +9,6 @@ import 'update_notification.dart'; /// Classes using this need to implement [SqliteConnection.readLock] /// and [SqliteConnection.writeLock]. mixin SqliteQueries implements SqliteWriteContext, SqliteConnection { - /// Broadcast stream that is notified of any table updates - Stream? get updates; - @override Future execute(String sql, [List parameters = const []]) async { From b20c9c241540f7802e4d3fdbd7ec090c8d7d6877 Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Fri, 23 Feb 2024 11:45:56 +0200 Subject: [PATCH 06/10] fix port name --- lib/src/common/isolate_connection_factory.dart | 6 ++---- lib/src/impl/stub_isolate_connection_factory.dart | 2 +- lib/src/web/web_isolate_connection_factory.dart | 2 +- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/lib/src/common/isolate_connection_factory.dart b/lib/src/common/isolate_connection_factory.dart index 76b5665..b082f75 100644 --- a/lib/src/common/isolate_connection_factory.dart +++ b/lib/src/common/isolate_connection_factory.dart @@ -30,13 +30,11 @@ abstract class IsolateConnectionFactory SerializedPortClient get upstreamPort; factory IsolateConnectionFactory( - {required openFactory, - required mutex, - SerializedPortClient? upstreamPort}) { + {required openFactory, required mutex, SerializedPortClient? port}) { return IsolateConnectionFactoryImpl( openFactory: openFactory, mutex: mutex, - upstreamPort: upstreamPort) as IsolateConnectionFactory; + port: port) as IsolateConnectionFactory; } /// Open a new SqliteConnection. diff --git a/lib/src/impl/stub_isolate_connection_factory.dart b/lib/src/impl/stub_isolate_connection_factory.dart index 208b7b8..2a67a98 100644 --- a/lib/src/impl/stub_isolate_connection_factory.dart +++ b/lib/src/impl/stub_isolate_connection_factory.dart @@ -16,7 +16,7 @@ class IsolateConnectionFactoryImpl IsolateConnectionFactoryImpl( {required this.openFactory, required Mutex mutex, - SerializedPortClient? upstreamPort}); + SerializedPortClient? port}); @override diff --git a/lib/src/web/web_isolate_connection_factory.dart b/lib/src/web/web_isolate_connection_factory.dart index e1f46ae..d6bc6ae 100644 --- a/lib/src/web/web_isolate_connection_factory.dart +++ b/lib/src/web/web_isolate_connection_factory.dart @@ -21,7 +21,7 @@ class IsolateConnectionFactoryImpl IsolateConnectionFactoryImpl( {required this.openFactory, required this.mutex, - SerializedPortClient? upstreamPort}); + SerializedPortClient? port}); /// Open a new SqliteConnection. /// From ba73ed89671a9bf876b22af258811d0979a10011 Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Wed, 28 Feb 2024 11:02:15 +0200 Subject: [PATCH 07/10] wait for write connection in connection pool --- lib/src/native/database/connection_pool.dart | 50 +++++++++---------- .../native_sqlite_connection_impl.dart | 2 +- .../database/native_sqlite_database.dart | 20 +++++++- .../native_isolate_connection_factory.dart | 6 +++ test/migration_test.dart | 1 + 5 files changed, 51 insertions(+), 28 deletions(-) diff --git a/lib/src/native/database/connection_pool.dart b/lib/src/native/database/connection_pool.dart index 729c0a7..ebfb3ff 100644 --- a/lib/src/native/database/connection_pool.dart +++ b/lib/src/native/database/connection_pool.dart @@ -9,15 +9,19 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { final StreamController updatesController = StreamController.broadcast(); + @override + + /// The write connection might be recreated if it's closed + /// This will allow the update stream remain constant even + /// after using a new write connection. + late Stream updates = updatesController.stream; + SqliteConnectionImpl? _writeConnection; final List _readConnections = []; final AbstractDefaultSqliteOpenFactory _factory; - @override - Stream? updates; - final int maxReaders; final String? debugName; @@ -38,13 +42,13 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { /// Read connections are opened in read-only mode, and will reject any statements /// that modify the database. SqliteConnectionPool(this._factory, - {Stream? updatesStream, - this.maxReaders = 5, + {this.maxReaders = 5, SqliteConnectionImpl? writeConnection, this.debugName, required this.mutex}) : _writeConnection = writeConnection { - updates = updatesStream ?? updatesController.stream; + // Use the write connection's updates + _writeConnection?.updates?.forEach(updatesController.add); } /// Returns true if the _write_ connection is currently in autocommit mode. @@ -122,23 +126,13 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { _writeConnection = null; } - /// Prevent possible race condition where multiple writeLock requests - /// could potentially open multiple writeConnections if (_writeConnection == null) { - await mutex.lock(() async { - if (_writeConnection != null) { - return; - } - _writeConnection ??= (await _factory.openConnection(SqliteOpenOptions( - primaryConnection: false, - debugName: debugName != null ? '$debugName-writer' : null, - mutex: mutex, - readOnly: false))) as SqliteConnectionImpl; - - _writeConnection!.updates?.forEach((update) { - updatesController.add(update); - }); - }); + _writeConnection ??= (await _factory.openConnection(SqliteOpenOptions( + primaryConnection: false, + debugName: debugName != null ? '$debugName-writer' : null, + mutex: mutex, + readOnly: false))) as SqliteConnectionImpl; + _writeConnection!.updates?.forEach(updatesController.add); } return _runZoned(() { @@ -163,6 +157,8 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { } Future _expandPool() async { + await _writeConnection?.ready; + if (closed || _readConnections.length >= maxReaders) { return; } @@ -171,13 +167,15 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { var name = debugName == null ? null : '$debugName-${_readConnections.length + 1}'; - var connection = await _factory.openConnection(SqliteOpenOptions( - primaryConnection: false, + var connection = SqliteConnectionImpl( + // The port is used to confirm the write connection has been initialized + port: _writeConnection?.upstreamPort, + primary: false, updates: updates, debugName: name, mutex: mutex, - readOnly: true)); - _readConnections.add(connection as SqliteConnectionImpl); + readOnly: true, + openFactory: _factory); // Edge case: // If we don't await here, there is a chance that a different connection diff --git a/lib/src/native/database/native_sqlite_connection_impl.dart b/lib/src/native/database/native_sqlite_connection_impl.dart index 4939149..5f66253 100644 --- a/lib/src/native/database/native_sqlite_connection_impl.dart +++ b/lib/src/native/database/native_sqlite_connection_impl.dart @@ -86,7 +86,7 @@ class SqliteConnectionImpl paused: true); _isolateClient.tieToIsolate(_isolate); _isolate.resume(_isolate.pauseCapability!); - + isInitialized = _isolateClient.ready; await _isolateClient.ready; }); } diff --git a/lib/src/native/database/native_sqlite_database.dart b/lib/src/native/database/native_sqlite_database.dart index 77bff54..f86890a 100644 --- a/lib/src/native/database/native_sqlite_database.dart +++ b/lib/src/native/database/native_sqlite_database.dart @@ -3,6 +3,7 @@ import 'dart:async'; import 'package:sqlite_async/src/common/abstract_open_factory.dart'; import 'package:sqlite_async/src/common/sqlite_database.dart'; import 'package:sqlite_async/src/native/database/connection_pool.dart'; +import 'package:sqlite_async/src/native/database/native_sqlite_connection_impl.dart'; import 'package:sqlite_async/src/native/native_isolate_connection_factory.dart'; import 'package:sqlite_async/src/native/native_isolate_mutex.dart'; import 'package:sqlite_async/src/native/native_sqlite_open_factory.dart'; @@ -21,6 +22,8 @@ class SqliteDatabaseImpl @override final DefaultSqliteOpenFactory openFactory; + late final SqliteConnectionImpl _internalConnection; + @override late Stream updates; @@ -67,8 +70,14 @@ class SqliteDatabaseImpl SqliteDatabaseImpl.withFactory(AbstractDefaultSqliteOpenFactory factory, {this.maxReaders = SqliteDatabase.defaultMaxReaders}) : openFactory = factory as DefaultSqliteOpenFactory { + _internalConnection = _openPrimaryConnection(debugName: 'sqlite-writer'); _pool = SqliteConnectionPool(openFactory, - debugName: 'sqlite', maxReaders: maxReaders, mutex: mutex); + writeConnection: _internalConnection, + debugName: 'sqlite', + maxReaders: maxReaders, + mutex: mutex); + // Updates get updates from the pool + updates = _pool.updates; } @override @@ -144,4 +153,13 @@ class SqliteDatabaseImpl return _pool.writeLock(callback, lockTimeout: lockTimeout, debugContext: debugContext); } + + SqliteConnectionImpl _openPrimaryConnection({String? debugName}) { + return SqliteConnectionImpl( + primary: true, + debugName: debugName, + mutex: mutex, + readOnly: false, + openFactory: openFactory); + } } diff --git a/lib/src/native/native_isolate_connection_factory.dart b/lib/src/native/native_isolate_connection_factory.dart index 19e732d..0e7d60d 100644 --- a/lib/src/native/native_isolate_connection_factory.dart +++ b/lib/src/native/native_isolate_connection_factory.dart @@ -17,6 +17,11 @@ mixin UpStreamTableUpdates { late SerializedPortClient upstreamPort; + @protected + + /// Resolves once the primary connection is initialized + late Future isInitialized; + @protected PortServer? eventsPort; @@ -42,6 +47,7 @@ mixin UpStreamTableUpdates { } return null; } else if (message is InitDb) { + await isInitialized; return null; } else if (message is SubscribeToUpdates) { if (subscriptions.containsKey(message.port)) { diff --git a/test/migration_test.dart b/test/migration_test.dart index 37d4c79..ab81957 100644 --- a/test/migration_test.dart +++ b/test/migration_test.dart @@ -1,3 +1,4 @@ +// @Timeout(Duration(seconds: 9000)) import 'package:sqlite_async/sqlite_async.dart'; import 'package:test/test.dart'; From d1d5d275fe55b12cc0ac3ae52bdf1f6e4e340632 Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Wed, 28 Feb 2024 10:54:30 +0000 Subject: [PATCH 08/10] cleanup naming for parameters --- .gitignore | 1 + lib/src/common/isolate_connection_factory.dart | 6 ++++-- lib/src/common/sqlite_database.dart | 2 ++ lib/src/impl/stub_isolate_connection_factory.dart | 2 +- lib/src/impl/stub_sqlite_database.dart | 2 ++ lib/src/native/database/connection_pool.dart | 8 +++++--- .../database/native_sqlite_connection_impl.dart | 14 +++++++------- .../native/database/native_sqlite_database.dart | 4 +++- .../native/native_isolate_connection_factory.dart | 8 ++++---- lib/src/web/database/web_sqlite_database.dart | 12 ++++++++++-- lib/src/web/web_isolate_connection_factory.dart | 2 +- 11 files changed, 40 insertions(+), 21 deletions(-) diff --git a/.gitignore b/.gitignore index 5c41ae2..f88fffb 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,7 @@ assets .idea .vscode +.devcontainer *.db *.db-* test-db diff --git a/lib/src/common/isolate_connection_factory.dart b/lib/src/common/isolate_connection_factory.dart index b082f75..76b5665 100644 --- a/lib/src/common/isolate_connection_factory.dart +++ b/lib/src/common/isolate_connection_factory.dart @@ -30,11 +30,13 @@ abstract class IsolateConnectionFactory SerializedPortClient get upstreamPort; factory IsolateConnectionFactory( - {required openFactory, required mutex, SerializedPortClient? port}) { + {required openFactory, + required mutex, + SerializedPortClient? upstreamPort}) { return IsolateConnectionFactoryImpl( openFactory: openFactory, mutex: mutex, - port: port) as IsolateConnectionFactory; + upstreamPort: upstreamPort) as IsolateConnectionFactory; } /// Open a new SqliteConnection. diff --git a/lib/src/common/sqlite_database.dart b/lib/src/common/sqlite_database.dart index b8380c8..8df4174 100644 --- a/lib/src/common/sqlite_database.dart +++ b/lib/src/common/sqlite_database.dart @@ -1,5 +1,6 @@ import 'dart:async'; +import 'package:meta/meta.dart'; import 'package:sqlite_async/src/common/abstract_open_factory.dart'; import 'package:sqlite_async/src/common/isolate_connection_factory.dart'; import 'package:sqlite_async/src/impl/sqlite_database_impl.dart'; @@ -26,6 +27,7 @@ mixin SqliteDatabaseMixin implements SqliteConnection, SqliteQueries { final StreamController updatesController = StreamController.broadcast(); + @protected Future get isInitialized; /// Wait for initialization to complete. diff --git a/lib/src/impl/stub_isolate_connection_factory.dart b/lib/src/impl/stub_isolate_connection_factory.dart index 2a67a98..208b7b8 100644 --- a/lib/src/impl/stub_isolate_connection_factory.dart +++ b/lib/src/impl/stub_isolate_connection_factory.dart @@ -16,7 +16,7 @@ class IsolateConnectionFactoryImpl IsolateConnectionFactoryImpl( {required this.openFactory, required Mutex mutex, - SerializedPortClient? port}); + SerializedPortClient? upstreamPort}); @override diff --git a/lib/src/impl/stub_sqlite_database.dart b/lib/src/impl/stub_sqlite_database.dart index adc8bbd..29db641 100644 --- a/lib/src/impl/stub_sqlite_database.dart +++ b/lib/src/impl/stub_sqlite_database.dart @@ -1,3 +1,4 @@ +import 'package:meta/meta.dart'; import 'package:sqlite_async/src/common/isolate_connection_factory.dart'; import 'package:sqlite_async/src/common/abstract_open_factory.dart'; import 'package:sqlite_async/src/common/sqlite_database.dart'; @@ -31,6 +32,7 @@ class SqliteDatabaseImpl } @override + @protected Future get isInitialized => throw UnimplementedError(); @override diff --git a/lib/src/native/database/connection_pool.dart b/lib/src/native/database/connection_pool.dart index ebfb3ff..9427a06 100644 --- a/lib/src/native/database/connection_pool.dart +++ b/lib/src/native/database/connection_pool.dart @@ -14,7 +14,7 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { /// The write connection might be recreated if it's closed /// This will allow the update stream remain constant even /// after using a new write connection. - late Stream updates = updatesController.stream; + late final Stream updates = updatesController.stream; SqliteConnectionImpl? _writeConnection; @@ -127,11 +127,12 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { } if (_writeConnection == null) { - _writeConnection ??= (await _factory.openConnection(SqliteOpenOptions( + _writeConnection = (await _factory.openConnection(SqliteOpenOptions( primaryConnection: false, debugName: debugName != null ? '$debugName-writer' : null, mutex: mutex, readOnly: false))) as SqliteConnectionImpl; + // Expose the new updates on the connection pool _writeConnection!.updates?.forEach(updatesController.add); } @@ -169,13 +170,14 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { : '$debugName-${_readConnections.length + 1}'; var connection = SqliteConnectionImpl( // The port is used to confirm the write connection has been initialized - port: _writeConnection?.upstreamPort, + upstreamPort: _writeConnection?.upstreamPort, primary: false, updates: updates, debugName: name, mutex: mutex, readOnly: true, openFactory: _factory); + _readConnections.add(connection); // Edge case: // If we don't await here, there is a chance that a different connection diff --git a/lib/src/native/database/native_sqlite_connection_impl.dart b/lib/src/native/database/native_sqlite_connection_impl.dart index 5f66253..d0f5450 100644 --- a/lib/src/native/database/native_sqlite_connection_impl.dart +++ b/lib/src/native/database/native_sqlite_connection_impl.dart @@ -26,8 +26,8 @@ class SqliteConnectionImpl /// Must be a broadcast stream @override - Stream? updates; - final ParentPortClient _isolateClient = ParentPortClient(); + late final Stream? updates; + late final ParentPortClient _isolateClient = ParentPortClient(); late final Isolate _isolate; final String? debugName; final bool readOnly; @@ -35,15 +35,15 @@ class SqliteConnectionImpl SqliteConnectionImpl( {required openFactory, required Mutex mutex, - SerializedPortClient? port, - this.updates, + SerializedPortClient? upstreamPort, + Stream? updates, this.debugName, this.readOnly = false, bool primary = false}) : _writeMutex = mutex { - upstreamPort = port ?? listenForEvents(); - updates = updates ?? updatesController.stream; - _open(openFactory, primary: primary, upstreamPort: upstreamPort); + this.upstreamPort = upstreamPort ?? listenForEvents(); + this.updates = updates ?? updatesController.stream; + _open(openFactory, primary: primary, upstreamPort: this.upstreamPort); } Future get ready async { diff --git a/lib/src/native/database/native_sqlite_database.dart b/lib/src/native/database/native_sqlite_database.dart index f86890a..a4cbf62 100644 --- a/lib/src/native/database/native_sqlite_database.dart +++ b/lib/src/native/database/native_sqlite_database.dart @@ -1,5 +1,6 @@ import 'dart:async'; +import 'package:meta/meta.dart'; import 'package:sqlite_async/src/common/abstract_open_factory.dart'; import 'package:sqlite_async/src/common/sqlite_database.dart'; import 'package:sqlite_async/src/native/database/connection_pool.dart'; @@ -31,6 +32,7 @@ class SqliteDatabaseImpl int maxReaders; @override + @protected // Native doesn't require any asynchronous initialization late Future isInitialized = Future.value(); @@ -100,7 +102,7 @@ class SqliteDatabaseImpl return IsolateConnectionFactoryImpl( openFactory: openFactory, mutex: mutex.shared, - port: _pool.upstreamPort); + upstreamPort: _pool.upstreamPort); } @override diff --git a/lib/src/native/native_isolate_connection_factory.dart b/lib/src/native/native_isolate_connection_factory.dart index 0e7d60d..c74c06f 100644 --- a/lib/src/native/native_isolate_connection_factory.dart +++ b/lib/src/native/native_isolate_connection_factory.dart @@ -83,8 +83,8 @@ class IsolateConnectionFactoryImpl IsolateConnectionFactoryImpl( {required this.openFactory, required this.mutex, - SerializedPortClient? port}) { - upstreamPort = port ?? listenForEvents(); + SerializedPortClient? upstreamPort}) { + this.upstreamPort = upstreamPort ?? listenForEvents(); } /// Open a new SqliteConnection. @@ -99,7 +99,7 @@ class IsolateConnectionFactoryImpl return _IsolateSqliteConnection( openFactory: openFactory, mutex: openMutex, - port: upstreamPort, + upstreamPort: upstreamPort, readOnly: readOnly, debugName: debugName, updates: updates.stream, @@ -147,7 +147,7 @@ class _IsolateSqliteConnection extends SqliteConnectionImpl { _IsolateSqliteConnection( {required super.openFactory, required super.mutex, - super.port, + super.upstreamPort, super.updates, super.debugName, super.readOnly = false, diff --git a/lib/src/web/database/web_sqlite_database.dart b/lib/src/web/database/web_sqlite_database.dart index 3a16365..a92ceac 100644 --- a/lib/src/web/database/web_sqlite_database.dart +++ b/lib/src/web/database/web_sqlite_database.dart @@ -1,4 +1,5 @@ import 'dart:async'; +import 'package:meta/meta.dart'; import 'package:sqlite_async/src/common/abstract_open_factory.dart'; import 'package:sqlite_async/src/common/mutex.dart'; import 'package:sqlite_async/src/sqlite_queries.dart'; @@ -27,6 +28,7 @@ class SqliteDatabaseImpl int maxReaders; @override + @protected late Future isInitialized; @override @@ -73,7 +75,7 @@ class SqliteDatabaseImpl Future _init() async { _connection = await openFactory.openConnection(SqliteOpenOptions( primaryConnection: true, readOnly: false, mutex: mutex)); - _connection.updates?.forEach((update) { + _connection.updates!.forEach((update) { updatesController.add(update); }); } @@ -81,6 +83,7 @@ class SqliteDatabaseImpl @override Future readLock(Future Function(SqliteReadContext tx) callback, {Duration? lockTimeout, String? debugContext}) async { + await isInitialized; return _connection.readLock(callback, lockTimeout: lockTimeout, debugContext: debugContext); } @@ -88,6 +91,7 @@ class SqliteDatabaseImpl @override Future writeLock(Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout, String? debugContext}) async { + await isInitialized; return _connection.writeLock(callback, lockTimeout: lockTimeout, debugContext: debugContext); } @@ -97,6 +101,7 @@ class SqliteDatabaseImpl Future Function(SqliteReadContext tx) callback, {Duration? lockTimeout, String? debugContext}) async { + await isInitialized; return _connection.readTransaction(callback, lockTimeout: lockTimeout); } @@ -105,11 +110,13 @@ class SqliteDatabaseImpl Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout, String? debugContext}) async { + await isInitialized; return _connection.writeTransaction(callback, lockTimeout: lockTimeout); } @override Future close() async { + await isInitialized; return _connection.close(); } @@ -119,7 +126,8 @@ class SqliteDatabaseImpl } @override - Future getAutoCommit() { + Future getAutoCommit() async { + await isInitialized; return _connection.getAutoCommit(); } } diff --git a/lib/src/web/web_isolate_connection_factory.dart b/lib/src/web/web_isolate_connection_factory.dart index d6bc6ae..e1f46ae 100644 --- a/lib/src/web/web_isolate_connection_factory.dart +++ b/lib/src/web/web_isolate_connection_factory.dart @@ -21,7 +21,7 @@ class IsolateConnectionFactoryImpl IsolateConnectionFactoryImpl( {required this.openFactory, required this.mutex, - SerializedPortClient? port}); + SerializedPortClient? upstreamPort}); /// Open a new SqliteConnection. /// From fbe7474b73c993e285a87e0d2f3ce9faa981dfde Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Wed, 28 Feb 2024 11:49:07 +0000 Subject: [PATCH 09/10] fix tests --- lib/src/native/database/connection_pool.dart | 2 +- test/migration_test.dart | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/src/native/database/connection_pool.dart b/lib/src/native/database/connection_pool.dart index 9427a06..df8b549 100644 --- a/lib/src/native/database/connection_pool.dart +++ b/lib/src/native/database/connection_pool.dart @@ -128,7 +128,7 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { if (_writeConnection == null) { _writeConnection = (await _factory.openConnection(SqliteOpenOptions( - primaryConnection: false, + primaryConnection: true, debugName: debugName != null ? '$debugName-writer' : null, mutex: mutex, readOnly: false))) as SqliteConnectionImpl; diff --git a/test/migration_test.dart b/test/migration_test.dart index ab81957..37d4c79 100644 --- a/test/migration_test.dart +++ b/test/migration_test.dart @@ -1,4 +1,3 @@ -// @Timeout(Duration(seconds: 9000)) import 'package:sqlite_async/sqlite_async.dart'; import 'package:test/test.dart'; From 3dc6f23e29fc96b7e38b4d9ee64e647252fad148 Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Wed, 28 Feb 2024 12:31:42 +0000 Subject: [PATCH 10/10] cleanup --- .../common/isolate_connection_factory.dart | 2 +- lib/src/native/database/connection_pool.dart | 3 - .../native_sqlite_connection_impl.dart | 6 +- .../database/native_sqlite_database.dart | 11 ++- lib/src/native/database/upstream_updates.dart | 67 ++++++++++++++++++ .../native_isolate_connection_factory.dart | 69 ++----------------- .../web/web_isolate_connection_factory.dart | 2 +- 7 files changed, 83 insertions(+), 77 deletions(-) create mode 100644 lib/src/native/database/upstream_updates.dart diff --git a/lib/src/common/isolate_connection_factory.dart b/lib/src/common/isolate_connection_factory.dart index 76b5665..90dc9b1 100644 --- a/lib/src/common/isolate_connection_factory.dart +++ b/lib/src/common/isolate_connection_factory.dart @@ -32,7 +32,7 @@ abstract class IsolateConnectionFactory factory IsolateConnectionFactory( {required openFactory, required mutex, - SerializedPortClient? upstreamPort}) { + required SerializedPortClient upstreamPort}) { return IsolateConnectionFactoryImpl( openFactory: openFactory, mutex: mutex, diff --git a/lib/src/native/database/connection_pool.dart b/lib/src/native/database/connection_pool.dart index df8b549..2dd8374 100644 --- a/lib/src/native/database/connection_pool.dart +++ b/lib/src/native/database/connection_pool.dart @@ -158,8 +158,6 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { } Future _expandPool() async { - await _writeConnection?.ready; - if (closed || _readConnections.length >= maxReaders) { return; } @@ -169,7 +167,6 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { ? null : '$debugName-${_readConnections.length + 1}'; var connection = SqliteConnectionImpl( - // The port is used to confirm the write connection has been initialized upstreamPort: _writeConnection?.upstreamPort, primary: false, updates: updates, diff --git a/lib/src/native/database/native_sqlite_connection_impl.dart b/lib/src/native/database/native_sqlite_connection_impl.dart index d0f5450..fd05957 100644 --- a/lib/src/native/database/native_sqlite_connection_impl.dart +++ b/lib/src/native/database/native_sqlite_connection_impl.dart @@ -6,13 +6,14 @@ import 'package:sqlite_async/sqlite3_common.dart'; import 'package:sqlite_async/src/common/abstract_open_factory.dart'; import 'package:sqlite_async/src/common/mutex.dart'; import 'package:sqlite_async/src/common/port_channel.dart'; -import 'package:sqlite_async/src/native/native_isolate_connection_factory.dart'; import 'package:sqlite_async/src/native/native_isolate_mutex.dart'; import 'package:sqlite_async/src/sqlite_connection.dart'; import 'package:sqlite_async/src/sqlite_queries.dart'; import 'package:sqlite_async/src/update_notification.dart'; import 'package:sqlite_async/src/utils/shared_utils.dart'; +import 'upstream_updates.dart'; + typedef TxCallback = Future Function(CommonDatabase db); /// Implements a SqliteConnection using a separate isolate for the database @@ -27,7 +28,7 @@ class SqliteConnectionImpl /// Must be a broadcast stream @override late final Stream? updates; - late final ParentPortClient _isolateClient = ParentPortClient(); + final ParentPortClient _isolateClient = ParentPortClient(); late final Isolate _isolate; final String? debugName; final bool readOnly; @@ -42,6 +43,7 @@ class SqliteConnectionImpl bool primary = false}) : _writeMutex = mutex { this.upstreamPort = upstreamPort ?? listenForEvents(); + // Accept an incoming stream of updates, or expose one if not given. this.updates = updates ?? updatesController.stream; _open(openFactory, primary: primary, upstreamPort: this.upstreamPort); } diff --git a/lib/src/native/database/native_sqlite_database.dart b/lib/src/native/database/native_sqlite_database.dart index a4cbf62..c57cfd0 100644 --- a/lib/src/native/database/native_sqlite_database.dart +++ b/lib/src/native/database/native_sqlite_database.dart @@ -23,24 +23,23 @@ class SqliteDatabaseImpl @override final DefaultSqliteOpenFactory openFactory; - late final SqliteConnectionImpl _internalConnection; - @override late Stream updates; @override int maxReaders; + /// Global lock to serialize write transactions. + final SimpleMutex mutex = SimpleMutex(); + @override @protected // Native doesn't require any asynchronous initialization late Future isInitialized = Future.value(); + late final SqliteConnectionImpl _internalConnection; late final SqliteConnectionPool _pool; - /// Global lock to serialize write transactions. - final SimpleMutex mutex = SimpleMutex(); - /// Open a SqliteDatabase. /// /// Only a single SqliteDatabase per [path] should be opened at a time. @@ -102,7 +101,7 @@ class SqliteDatabaseImpl return IsolateConnectionFactoryImpl( openFactory: openFactory, mutex: mutex.shared, - upstreamPort: _pool.upstreamPort); + upstreamPort: _pool.upstreamPort!); } @override diff --git a/lib/src/native/database/upstream_updates.dart b/lib/src/native/database/upstream_updates.dart new file mode 100644 index 0000000..8d94cc5 --- /dev/null +++ b/lib/src/native/database/upstream_updates.dart @@ -0,0 +1,67 @@ +import 'dart:async'; +import 'dart:isolate'; + +import 'package:meta/meta.dart'; +import 'package:sqlite_async/src/common/port_channel.dart'; +import 'package:sqlite_async/src/update_notification.dart'; +import 'package:sqlite_async/src/utils/native_database_utils.dart'; +import 'package:sqlite_async/src/utils/shared_utils.dart'; + +mixin UpStreamTableUpdates { + final StreamController updatesController = + StreamController.broadcast(); + + late SerializedPortClient upstreamPort; + + @protected + + /// Resolves once the primary connection is initialized + late Future isInitialized; + + @protected + PortServer? eventsPort; + + @protected + SerializedPortClient listenForEvents() { + UpdateNotification? updates; + + Map subscriptions = {}; + + eventsPort = PortServer((message) async { + if (message is UpdateNotification) { + if (updates == null) { + updates = message; + // Use the mutex to only send updates after the current transaction. + // Do take care to avoid getting a lock for each individual update - + // that could add massive performance overhead. + if (updates != null) { + updatesController.add(updates!); + updates = null; + } + } else { + updates!.tables.addAll(message.tables); + } + return null; + } else if (message is InitDb) { + await isInitialized; + return null; + } else if (message is SubscribeToUpdates) { + if (subscriptions.containsKey(message.port)) { + return; + } + final subscription = updatesController.stream.listen((event) { + message.port.send(event); + }); + subscriptions[message.port] = subscription; + return null; + } else if (message is UnsubscribeToUpdates) { + final subscription = subscriptions.remove(message.port); + subscription?.cancel(); + return null; + } else { + throw ArgumentError('Unknown message type: $message'); + } + }); + return upstreamPort = eventsPort!.client(); + } +} diff --git a/lib/src/native/native_isolate_connection_factory.dart b/lib/src/native/native_isolate_connection_factory.dart index c74c06f..e9110c4 100644 --- a/lib/src/native/native_isolate_connection_factory.dart +++ b/lib/src/native/native_isolate_connection_factory.dart @@ -1,7 +1,6 @@ import 'dart:async'; import 'dart:isolate'; -import 'package:meta/meta.dart'; import 'package:sqlite_async/src/common/isolate_connection_factory.dart'; import 'package:sqlite_async/src/common/port_channel.dart'; import 'package:sqlite_async/src/native/native_isolate_mutex.dart'; @@ -11,68 +10,9 @@ import 'package:sqlite_async/src/update_notification.dart'; import 'package:sqlite_async/src/utils/database_utils.dart'; import 'database/native_sqlite_connection_impl.dart'; -mixin UpStreamTableUpdates { - final StreamController updatesController = - StreamController.broadcast(); - - late SerializedPortClient upstreamPort; - - @protected - - /// Resolves once the primary connection is initialized - late Future isInitialized; - - @protected - PortServer? eventsPort; - - @protected - SerializedPortClient listenForEvents() { - UpdateNotification? updates; - - Map subscriptions = {}; - - eventsPort = PortServer((message) async { - if (message is UpdateNotification) { - if (updates == null) { - updates = message; - // Use the mutex to only send updates after the current transaction. - // Do take care to avoid getting a lock for each individual update - - // that could add massive performance overhead. - if (updates != null) { - updatesController.add(updates!); - updates = null; - } - } else { - updates!.tables.addAll(message.tables); - } - return null; - } else if (message is InitDb) { - await isInitialized; - return null; - } else if (message is SubscribeToUpdates) { - if (subscriptions.containsKey(message.port)) { - return; - } - final subscription = updatesController.stream.listen((event) { - message.port.send(event); - }); - subscriptions[message.port] = subscription; - return null; - } else if (message is UnsubscribeToUpdates) { - final subscription = subscriptions.remove(message.port); - subscription?.cancel(); - return null; - } else { - throw ArgumentError('Unknown message type: $message'); - } - }); - return upstreamPort = eventsPort!.client(); - } -} - /// A connection factory that can be passed to different isolates. class IsolateConnectionFactoryImpl - with IsolateOpenFactoryMixin, UpStreamTableUpdates + with IsolateOpenFactoryMixin implements IsolateConnectionFactory { @override DefaultSqliteOpenFactory openFactory; @@ -80,12 +20,13 @@ class IsolateConnectionFactoryImpl @override SerializedMutex mutex; + @override + final SerializedPortClient upstreamPort; + IsolateConnectionFactoryImpl( {required this.openFactory, required this.mutex, - SerializedPortClient? upstreamPort}) { - this.upstreamPort = upstreamPort ?? listenForEvents(); - } + required this.upstreamPort}); /// Open a new SqliteConnection. /// diff --git a/lib/src/web/web_isolate_connection_factory.dart b/lib/src/web/web_isolate_connection_factory.dart index e1f46ae..2fa4f87 100644 --- a/lib/src/web/web_isolate_connection_factory.dart +++ b/lib/src/web/web_isolate_connection_factory.dart @@ -21,7 +21,7 @@ class IsolateConnectionFactoryImpl IsolateConnectionFactoryImpl( {required this.openFactory, required this.mutex, - SerializedPortClient? upstreamPort}); + required SerializedPortClient upstreamPort}); /// Open a new SqliteConnection. ///