diff --git a/lib/drift.dart b/lib/drift.dart deleted file mode 100644 index 9e038a3..0000000 --- a/lib/drift.dart +++ /dev/null @@ -1,6 +0,0 @@ -/// Re-exports [Drift](https://pub.dev/packages/drift) to expose drift without -/// adding it as a direct dependency. -library; - -export 'package:drift/wasm.dart'; -export 'package:sqlite_async/src/web/worker/worker_utils.dart'; diff --git a/lib/src/web/database.dart b/lib/src/web/database.dart new file mode 100644 index 0000000..343ae84 --- /dev/null +++ b/lib/src/web/database.dart @@ -0,0 +1,183 @@ +import 'dart:async'; +import 'dart:js_interop'; + +import 'package:sqlite3/common.dart'; +import 'package:sqlite3_web/sqlite3_web.dart'; +import 'package:sqlite_async/mutex.dart'; +import 'package:sqlite_async/src/common/sqlite_database.dart'; +import 'package:sqlite_async/src/sqlite_connection.dart'; +import 'package:sqlite_async/src/sqlite_queries.dart'; +import 'package:sqlite_async/src/update_notification.dart'; +import 'protocol.dart'; + +class WebDatabase + with SqliteQueries, SqliteDatabaseMixin + implements SqliteDatabase { + final Database _database; + final Mutex? _mutex; + + @override + bool closed = false; + + WebDatabase(this._database, this._mutex); + + @override + Future close() async { + await _database.dispose(); + closed = true; + } + + @override + Future getAutoCommit() async { + final response = await _database.customRequest( + CustomDatabaseMessage(CustomDatabaseMessageKind.getAutoCommit)); + return (response as JSBoolean?)?.toDart ?? false; + } + + @override + Future initialize() { + return Future.value(); + } + + @override + Future get isInitialized => initialize(); + + @override + Never isolateConnectionFactory() { + throw UnimplementedError(); + } + + @override + int get maxReaders => throw UnimplementedError(); + + @override + Never get openFactory => throw UnimplementedError(); + + @override + Future readLock(Future Function(SqliteReadContext tx) callback, + {Duration? lockTimeout, String? debugContext}) async { + if (_mutex case var mutex?) { + return await mutex.lock(() async { + final context = _SharedContext(this); + try { + return await callback(context); + } finally { + context.markClosed(); + } + }); + } else { + // No custom mutex, coordinate locks through shared worker. + await _database.customRequest( + CustomDatabaseMessage(CustomDatabaseMessageKind.requestSharedLock)); + + try { + return await callback(_SharedContext(this)); + } finally { + await _database.customRequest( + CustomDatabaseMessage(CustomDatabaseMessageKind.releaseLock)); + } + } + } + + @override + Stream get updates => + _database.updates.map((event) => UpdateNotification({event.tableName})); + + @override + // todo: Why do we have to expose both a stream and a controller? + StreamController get updatesController => + throw UnimplementedError(); + + @override + Future writeLock(Future Function(SqliteWriteContext tx) callback, + {Duration? lockTimeout, String? debugContext}) async { + if (_mutex case var mutex?) { + return await mutex.lock(() async { + final context = _ExlusiveContext(this); + try { + return await callback(context); + } finally { + context.markClosed(); + } + }); + } else { + // No custom mutex, coordinate locks through shared worker. + await _database.customRequest(CustomDatabaseMessage( + CustomDatabaseMessageKind.requestExclusiveLock)); + final context = _ExlusiveContext(this); + + try { + return await callback(context); + } finally { + context.markClosed(); + await _database.customRequest( + CustomDatabaseMessage(CustomDatabaseMessageKind.releaseLock)); + } + } + } +} + +class _SharedContext implements SqliteReadContext { + final WebDatabase _database; + bool _contextClosed = false; + + _SharedContext(this._database); + + @override + bool get closed => _contextClosed || _database.closed; + + @override + Future computeWithDatabase( + Future Function(CommonDatabase db) compute) { + // Can't be implemented: The database may live on another worker. + throw UnimplementedError(); + } + + @override + Future get(String sql, [List parameters = const []]) async { + final results = await getAll(sql, parameters); + return results.single; + } + + @override + Future getAll(String sql, + [List parameters = const []]) async { + return await _database._database.select(sql, parameters); + } + + @override + Future getAutoCommit() async { + return _database.getAutoCommit(); + } + + @override + Future getOptional(String sql, + [List parameters = const []]) async { + final results = await getAll(sql, parameters); + return results.singleOrNull; + } + + void markClosed() { + _contextClosed = true; + } +} + +class _ExlusiveContext extends _SharedContext implements SqliteWriteContext { + _ExlusiveContext(super.database); + + @override + Future execute(String sql, + [List parameters = const []]) async { + return await _database._database.select(sql, parameters); + } + + @override + Future executeBatch( + String sql, List> parameterSets) async { + for (final set in parameterSets) { + // use execute instead of select to avoid transferring rows from the + // worker to this context. + await _database._database.execute(sql, set); + } + } +} diff --git a/lib/src/web/database/connection/drift_sqlite_connection.dart b/lib/src/web/database/connection/drift_sqlite_connection.dart deleted file mode 100644 index 2ee7f0b..0000000 --- a/lib/src/web/database/connection/drift_sqlite_connection.dart +++ /dev/null @@ -1,259 +0,0 @@ -import 'dart:async'; -import 'package:drift/drift.dart'; -import 'package:drift/remote.dart'; -import 'package:drift/wasm.dart'; -import 'package:meta/meta.dart'; -import 'package:sqlite_async/sqlite3_common.dart'; - -import 'package:sqlite_async/src/common/mutex.dart'; - -import 'package:sqlite_async/src/sqlite_connection.dart'; -import 'package:sqlite_async/src/sqlite_queries.dart'; -import 'package:sqlite_async/src/update_notification.dart'; -import 'package:sqlite_async/src/utils/shared_utils.dart'; - -/// Custom function which exposes CommonDatabase.autocommit -const sqliteAsyncAutoCommitCommand = 'sqlite_async_autocommit'; - -class DriftSqliteConnection with SqliteQueries implements SqliteConnection { - WasmDatabaseResult db; - - @override - late Stream updates; - - final Mutex mutex; - - @override - bool closed = false; - - DriftSqliteConnection(this.db, this.mutex) { - // Pass on table updates - updates = db.resolvedExecutor.streamQueries - .updatesForSync(TableUpdateQuery.any()) - .map((tables) { - return UpdateNotification(tables.map((e) => e.table).toSet()); - }); - } - - @override - close() { - closed = true; - return db.resolvedExecutor.close(); - } - - @override - Future executeBatch( - String sql, List> parameterSets) async { - try { - final result = await db.resolvedExecutor.runBatched(BatchedStatements( - [sql], - parameterSets - .map((e) => ArgumentsForBatchedStatement(0, e)) - .toList())); - return result; - } on DriftRemoteException catch (e) { - if (e.toString().contains('SqliteException')) { - // Drift wraps these in remote errors - throw SqliteException(e.remoteCause.hashCode, e.remoteCause.toString()); - } - rethrow; - } - } - - Future select(String sql, - [List parameters = const []]) async { - try { - final result = await db.resolvedExecutor.runSelect(sql, parameters); - if (result.isEmpty) { - return ResultSet([], [], []); - } - return ResultSet(result.first.keys.toList(), [], - result.map((e) => e.values.toList()).toList()); - } on DriftRemoteException catch (e) { - if (e.toString().contains('SqliteException')) { - // Drift wraps these in remote errors - throw SqliteException(e.remoteCause.hashCode, e.remoteCause.toString()); - } - rethrow; - } - } - - @override - Future readLock(Future Function(SqliteReadContext tx) callback, - {Duration? lockTimeout, - String? debugContext, - bool isTransaction = false}) async { - return _runZoned( - () => mutex.lock(() async { - final context = - DriftReadContext(this, isTransaction: isTransaction); - try { - final result = await callback(context); - return result; - } finally { - context.close(); - } - }, timeout: lockTimeout), - debugContext: debugContext ?? 'execute()'); - } - - @override - Future writeLock(Future Function(SqliteWriteContext tx) callback, - {Duration? lockTimeout, - String? debugContext, - bool isTransaction = false}) async { - return _runZoned( - () => mutex.lock(() async { - final context = - DriftWriteContext(this, isTransaction: isTransaction); - try { - final result = await callback(context); - return result; - } finally { - context.close(); - } - }, timeout: lockTimeout), - debugContext: debugContext ?? 'execute()'); - } - - @override - Future readTransaction( - Future Function(SqliteReadContext tx) callback, - {Duration? lockTimeout}) async { - return readLock((ctx) async { - return await internalReadTransaction(ctx, callback); - }, - lockTimeout: lockTimeout, - debugContext: 'readTransaction()', - isTransaction: true); - } - - @override - Future writeTransaction( - Future Function(SqliteWriteContext tx) callback, - {Duration? lockTimeout}) async { - return writeLock(( - ctx, - ) async { - return await internalWriteTransaction(ctx, callback); - }, - lockTimeout: lockTimeout, - debugContext: 'writeTransaction()', - isTransaction: true); - } - - /// The mutex on individual connections do already error in recursive locks. - /// - /// We duplicate the same check here, to: - /// 1. Also error when the recursive transaction is handled by a different - /// connection (with a different lock). - /// 2. Give a more specific error message when it happens. - T _runZoned(T Function() callback, {required String debugContext}) { - if (Zone.current[this] != null) { - throw LockError( - 'Recursive lock is not allowed. Use `tx.$debugContext` instead of `db.$debugContext`.'); - } - var zone = Zone.current.fork(zoneValues: {this: true}); - return zone.run(callback); - } - - @override - Future getAutoCommit() async { - return DriftWriteContext(this).getAutoCommit(); - } -} - -class DriftReadContext implements SqliteReadContext { - DriftSqliteConnection db; - bool _closed = false; - - @protected - bool isTransaction; - - DriftReadContext(this.db, {this.isTransaction = false}); - - @override - Future computeWithDatabase( - Future Function(CommonDatabase db) compute) { - throw UnimplementedError(); - } - - @override - Future get(String sql, [List parameters = const []]) async { - return (await getAll(sql, parameters)).first; - } - - @override - Future getAll(String sql, - [List parameters = const []]) async { - if (_closed) { - throw SqliteException(0, 'Transaction closed', null, sql); - } - return db.select(sql, parameters); - } - - @override - Future getOptional(String sql, - [List parameters = const []]) async { - return (await db.select(sql, parameters)).firstOrNull; - } - - @override - bool get closed => _closed; - - close() { - _closed = true; - } - - @override - Future getAutoCommit() async { - final response = await db.select('select $sqliteAsyncAutoCommitCommand()'); - if (response.isEmpty) { - return false; - } - - return response.first.values.first != 0; - } -} - -class DriftWriteContext extends DriftReadContext implements SqliteWriteContext { - DriftWriteContext(super.db, {super.isTransaction}); - - @override - Future execute(String sql, - [List parameters = const []]) async { - return getAll(sql, parameters); - } - - @override - Future getAll(String sql, - [List parameters = const []]) async { - if (_closed) { - throw SqliteException(0, 'Transaction closed', null, sql); - } - - /// Statements in read/writeTransactions should not execute after ROLLBACK - if (isTransaction && - !sql.toLowerCase().contains('begin') && - await getAutoCommit()) { - throw SqliteException(0, - 'Transaction rolled back by earlier statement. Cannot execute: $sql'); - } - return db.select(sql, parameters); - } - - @override - Future executeBatch( - String sql, List> parameterSets) async { - return db.executeBatch(sql, parameterSets); - } -} - -class DriftSqliteUser extends QueryExecutorUser { - @override - Future beforeOpen( - QueryExecutor executor, OpeningDetails details) async {} - - @override - int get schemaVersion => 1; -} diff --git a/lib/src/web/database/web_sqlite_database.dart b/lib/src/web/database/web_sqlite_database.dart index a92ceac..4cb9fb4 100644 --- a/lib/src/web/database/web_sqlite_database.dart +++ b/lib/src/web/database/web_sqlite_database.dart @@ -80,38 +80,33 @@ class SqliteDatabaseImpl }); } + T _runZoned(T Function() callback, {required String debugContext}) { + if (Zone.current[this] != null) { + throw LockError( + 'Recursive lock is not allowed. Use `tx.$debugContext` instead of `db.$debugContext`.'); + } + var zone = Zone.current.fork(zoneValues: {this: true}); + return zone.run(callback); + } + @override Future readLock(Future Function(SqliteReadContext tx) callback, {Duration? lockTimeout, String? debugContext}) async { await isInitialized; - return _connection.readLock(callback, - lockTimeout: lockTimeout, debugContext: debugContext); + return _runZoned(() { + return _connection.readLock(callback, + lockTimeout: lockTimeout, debugContext: debugContext); + }, debugContext: debugContext ?? 'execute()'); } @override Future writeLock(Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout, String? debugContext}) async { await isInitialized; - return _connection.writeLock(callback, - lockTimeout: lockTimeout, debugContext: debugContext); - } - - @override - Future readTransaction( - Future Function(SqliteReadContext tx) callback, - {Duration? lockTimeout, - String? debugContext}) async { - await isInitialized; - return _connection.readTransaction(callback, lockTimeout: lockTimeout); - } - - @override - Future writeTransaction( - Future Function(SqliteWriteContext tx) callback, - {Duration? lockTimeout, - String? debugContext}) async { - await isInitialized; - return _connection.writeTransaction(callback, lockTimeout: lockTimeout); + return _runZoned(() { + return _connection.writeLock(callback, + lockTimeout: lockTimeout, debugContext: debugContext); + }, debugContext: debugContext ?? 'execute()'); } @override diff --git a/lib/src/web/protocol.dart b/lib/src/web/protocol.dart new file mode 100644 index 0000000..280989e --- /dev/null +++ b/lib/src/web/protocol.dart @@ -0,0 +1,32 @@ +/// Custom requests used by this package to manage locks in shared workers. +@JS() +library; + +import 'dart:js_interop'; + +/// Custom function which exposes CommonDatabase.autocommit +const sqliteAsyncAutoCommitCommand = 'sqlite_async_autocommit'; + +enum CustomDatabaseMessageKind { + requestSharedLock, + requestExclusiveLock, + releaseLock, + lockObtained, + getAutoCommit, +} + +extension type CustomDatabaseMessage._raw(JSObject _) implements JSObject { + external factory CustomDatabaseMessage._({ + required JSString rawKind, + }); + + factory CustomDatabaseMessage(CustomDatabaseMessageKind kind) { + return CustomDatabaseMessage._(rawKind: kind.name.toJS); + } + + external JSString get rawKind; + + CustomDatabaseMessageKind get kind { + return CustomDatabaseMessageKind.values.byName(rawKind.toDart); + } +} diff --git a/lib/src/web/web_sqlite_open_factory.dart b/lib/src/web/web_sqlite_open_factory.dart index 8bb7f4e..aea126c 100644 --- a/lib/src/web/web_sqlite_open_factory.dart +++ b/lib/src/web/web_sqlite_open_factory.dart @@ -1,17 +1,28 @@ import 'dart:async'; -import 'package:drift/wasm.dart'; import 'package:sqlite3/wasm.dart'; +import 'package:sqlite3_web/sqlite3_web.dart'; import 'package:sqlite_async/sqlite_async.dart'; -import 'package:sqlite_async/src/web/database/connection/drift_sqlite_connection.dart'; import 'package:sqlite_async/src/web/web_mutex.dart'; +import 'database.dart'; + /// Web implementation of [AbstractDefaultSqliteOpenFactory] class DefaultSqliteOpenFactory extends AbstractDefaultSqliteOpenFactory { + // todo: For users with multiple databases, the WebSqlite should be shared + // between the different paths. + final Future _initialized; + DefaultSqliteOpenFactory( {required super.path, - super.sqliteOptions = const SqliteOptions.defaults()}); + super.sqliteOptions = const SqliteOptions.defaults()}) + : _initialized = Future.sync(() { + return WebSqlite.open( + wasmModule: Uri.parse(sqliteOptions.webSqliteOptions.wasmUri), + worker: Uri.parse(sqliteOptions.webSqliteOptions.workerUri), + ); + }); @override @@ -37,15 +48,17 @@ class DefaultSqliteOpenFactory /// and automatic persistence storage selection. /// Due to being asynchronous, the under laying CommonDatabase is not accessible Future openConnection(SqliteOpenOptions options) async { - final db = await WasmDatabase.open( - databaseName: path, - sqlite3Uri: Uri.parse(sqliteOptions.webSqliteOptions.wasmUri), - driftWorkerUri: Uri.parse(sqliteOptions.webSqliteOptions.workerUri), - ); + final workers = await _initialized; + final connection = await workers.connectToRecommended(path); - await db.resolvedExecutor.ensureOpen(DriftSqliteUser()); + // When the database is accessed through a shared worker, we implement + // mutexes over custom messages sent through the shared worker. In other + // cases, we need to implement a mutex locally. + final mutex = connection.access == AccessMode.throughSharedWorker + ? null + : MutexImpl(); - return DriftSqliteConnection(db, options.mutex ?? MutexImpl()); + return WebDatabase(connection.database, options.mutex ?? mutex); } @override diff --git a/lib/src/web/worker/drift_worker.dart b/lib/src/web/worker/drift_worker.dart deleted file mode 100644 index 4ef3956..0000000 --- a/lib/src/web/worker/drift_worker.dart +++ /dev/null @@ -1,21 +0,0 @@ -/// This is an example of a database worker script -/// Custom database logic can be achieved by implementing this template -/// This file needs to be compiled to JavaScript with the command: -/// dart compile js -O4 lib/src/web/worker/db_worker.dart -o build/db_worker.js -/// The output should then be included in each project's `web` directory -library; - -import 'package:sqlite_async/drift.dart'; -import 'package:sqlite_async/sqlite3_common.dart'; - -/// Use this function to register any custom DB functionality -/// which requires direct access to the connection -void setupDatabase(CommonDatabase database) { - setupCommonWorkerDB(database); -} - -void main() { - WasmDatabase.workerMainForOpen( - setupAllDatabases: setupDatabase, - ); -} diff --git a/lib/src/web/worker/worker.dart b/lib/src/web/worker/worker.dart new file mode 100644 index 0000000..52798b2 --- /dev/null +++ b/lib/src/web/worker/worker.dart @@ -0,0 +1,68 @@ +/// This is an example of a database worker script +/// Custom database logic can be achieved by implementing this template +/// This file needs to be compiled to JavaScript with the command: +/// dart compile js -O4 lib/src/web/worker/db_worker.dart -o build/db_worker.js +/// The output should then be included in each project's `web` directory +library; + +import 'dart:js_interop'; + +import 'package:mutex/mutex.dart'; +import 'package:sqlite3/wasm.dart'; +import 'package:sqlite3_web/sqlite3_web.dart'; + +import '../protocol.dart'; +import 'worker_utils.dart'; + +void main() { + WebSqlite.workerEntrypoint(controller: _AsyncSqliteController()); +} + +final class _AsyncSqliteController extends DatabaseController { + @override + Future openDatabase(WasmSqlite3 sqlite3, String vfs) async { + final db = sqlite3.open('/app.db', vfs: vfs); + setupCommonWorkerDB(db); + + return _AsyncSqliteDatabase(database: db); + } + + @override + Future handleCustomRequest( + ClientConnection connection, JSAny? request) { + throw UnimplementedError(); + } +} + +class _AsyncSqliteDatabase extends WorkerDatabase { + @override + final CommonDatabase database; + + // This mutex is only used for lock requests from clients. Clients only send + // these requests for shared workers, so we can assume each database is only + // opened once and we don't need web locks here. + final mutex = ReadWriteMutex(); + + _AsyncSqliteDatabase({required this.database}); + + @override + Future handleCustomRequest( + ClientConnection connection, JSAny? request) async { + final message = request as CustomDatabaseMessage; + + switch (message.kind) { + case CustomDatabaseMessageKind.requestSharedLock: + await mutex.acquireRead(); + case CustomDatabaseMessageKind.requestExclusiveLock: + await mutex.acquireWrite(); + case CustomDatabaseMessageKind.releaseLock: + mutex.release(); + case CustomDatabaseMessageKind.lockObtained: + throw UnsupportedError('This is a response, not a request'); + case CustomDatabaseMessageKind.getAutoCommit: + return database.autocommit.toJS; + } + + return CustomDatabaseMessage(CustomDatabaseMessageKind.lockObtained); + } +} diff --git a/lib/src/web/worker/worker_utils.dart b/lib/src/web/worker/worker_utils.dart index 46c276b..9faa342 100644 --- a/lib/src/web/worker/worker_utils.dart +++ b/lib/src/web/worker/worker_utils.dart @@ -1,5 +1,6 @@ import 'package:sqlite_async/sqlite3_common.dart'; -import 'package:sqlite_async/src/web/database/connection/drift_sqlite_connection.dart'; + +import '../protocol.dart'; void setupCommonWorkerDB(CommonDatabase database) { /// Exposes autocommit via a query function diff --git a/pubspec.yaml b/pubspec.yaml index 4a8ad38..3db1122 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -3,18 +3,11 @@ description: High-performance asynchronous interface for SQLite on Dart and Flut version: 0.7.0-alpha.2 repository: https://github.com/powersync-ja/sqlite_async.dart environment: - sdk: '>=3.2.0 <4.0.0' + sdk: '>=3.3.0 <4.0.0' dependencies: - drift: ^2.15.0 - # In order to compile custom web worker - # drift: - # git: - # url: https://github.com/powersync-ja/drift.git - # ref: test # branch name - # path: drift sqlite3: '^2.3.0' - js: ^0.6.3 + sqlite3_web: any # todo publish async: ^2.10.0 collection: ^1.17.0 mutex: ^3.1.0 @@ -31,6 +24,18 @@ dev_dependencies: stream_channel: ^2.1.2 path: ^1.9.0 +dependency_overrides: + sqlite3: + git: + url: https://github.com/simolus3/sqlite3.dart.git + ref: web + path: sqlite3 + sqlite3_web: + git: + url: https://github.com/simolus3/sqlite3.dart.git + ref: web + path: sqlite3_web + platforms: android: ios: diff --git a/test/server/worker_server.dart b/test/server/worker_server.dart index 394ff89..a8ecc6c 100644 --- a/test/server/worker_server.dart +++ b/test/server/worker_server.dart @@ -27,7 +27,7 @@ Future hybridMain(StreamChannel channel) async { '-o', driftWorkerPath, '-O0', - 'lib/src/web/worker/drift_worker.dart', + 'lib/src/web/worker/worker.dart', ]); if (process.exitCode != 0) { fail('Could not compile worker');