Skip to content
Merged
210 changes: 131 additions & 79 deletions lib/src/connection_pool.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'dart:async';
import 'dart:collection';

import 'mutex.dart';
import 'port_channel.dart';
Expand All @@ -12,7 +13,9 @@ import 'update_notification.dart';
class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
SqliteConnection? _writeConnection;

final List<SqliteConnectionImpl> _readConnections = [];
final Set<SqliteConnectionImpl> _allReadConnections = {};
final Queue<SqliteConnectionImpl> _availableReadConnections = Queue();
final Queue<_PendingItem> _queue = Queue();

final SqliteOpenFactory _factory;
final SerializedPortClient _upstreamPort;
Expand Down Expand Up @@ -53,72 +56,84 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
@override
Future<bool> getAutoCommit() async {
if (_writeConnection == null) {
throw AssertionError('Closed');
throw ClosedException();
}
return await _writeConnection!.getAutoCommit();
}

@override
Future<T> readLock<T>(Future<T> Function(SqliteReadContext tx) callback,
{Duration? lockTimeout, String? debugContext}) async {
await _expandPool();

return _runZoned(() async {
bool haveLock = false;
var completer = Completer<T>();
void _nextRead() {
if (_queue.isEmpty) {
// Wait for queue item
return;
} else if (closed) {
while (_queue.isNotEmpty) {
final nextItem = _queue.removeFirst();
nextItem.completer.completeError(const ClosedException());
}
return;
}

var futures = _readConnections.sublist(0).map((connection) async {
if (connection.closed) {
_readConnections.remove(connection);
}
try {
return await connection.readLock((ctx) async {
if (haveLock) {
// Already have a different lock - release this one.
return false;
}
haveLock = true;

var future = callback(ctx);
completer.complete(future);

// We have to wait for the future to complete before we can release the
// lock.
try {
await future;
} catch (_) {
// Ignore
}

return true;
}, lockTimeout: lockTimeout, debugContext: debugContext);
} on TimeoutException {
return false;
}
});
while (_availableReadConnections.isNotEmpty &&
_availableReadConnections.last.closed) {
// Remove connections that may have errored
final connection = _availableReadConnections.removeLast();
_allReadConnections.remove(connection);
}

final stream = Stream<bool>.fromFutures(futures);
var gotAny = await stream.any((element) => element);
if (_availableReadConnections.isEmpty &&
_allReadConnections.length == maxReaders) {
// Wait for available connection
return;
}

if (!gotAny) {
// All TimeoutExceptions
throw TimeoutException('Failed to get a read connection', lockTimeout);
var nextItem = _queue.removeFirst();
while (nextItem.completer.isCompleted) {
// This item already timed out - try the next one if available
if (_queue.isEmpty) {
return;
}
nextItem = _queue.removeFirst();
}

nextItem.lockTimer?.cancel();

nextItem.completer.complete(Future.sync(() async {
final nextConnection = _availableReadConnections.isEmpty
? await _expandPool()
: _availableReadConnections.removeLast();
try {
return await completer.future;
} catch (e) {
// throw e;
rethrow;
// At this point the connection is expected to be available immediately.
// No need to calculate a new lockTimeout here.
final result = await nextConnection.readLock(nextItem.callback);
return result;
} finally {
_availableReadConnections.add(nextConnection);
Timer.run(_nextRead);
}
}, debugContext: debugContext ?? 'get*()');
}));
}

@override
Future<T> readLock<T>(ReadCallback<T> callback,
{Duration? lockTimeout, String? debugContext}) async {
if (closed) {
throw ClosedException();
}
final zone = _getZone(debugContext: debugContext ?? 'get*()');
final item = _PendingItem((ctx) {
return zone.runUnary(callback, ctx);
}, lockTimeout: lockTimeout);
_queue.add(item);
_nextRead();

return (await item.future) as T;
}

@override
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
{Duration? lockTimeout, String? debugContext}) {
if (closed) {
throw AssertionError('Closed');
throw ClosedException();
}
if (_writeConnection?.closed == true) {
_writeConnection = null;
Expand All @@ -144,46 +159,52 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
/// connection (with a different lock).
/// 2. Give a more specific error message when it happens.
T _runZoned<T>(T Function() callback, {required String debugContext}) {
return _getZone(debugContext: debugContext).run(callback);
}

Zone _getZone({required String debugContext}) {
if (Zone.current[this] != null) {
throw LockError(
'Recursive lock is not allowed. Use `tx.$debugContext` instead of `db.$debugContext`.');
}
var zone = Zone.current.fork(zoneValues: {this: true});
return zone.run(callback);
return Zone.current.fork(zoneValues: {this: true});
}

Future<void> _expandPool() async {
if (closed || _readConnections.length >= maxReaders) {
return;
}
bool hasCapacity = _readConnections.any((connection) => !connection.locked);
if (!hasCapacity) {
var name = debugName == null
? null
: '$debugName-${_readConnections.length + 1}';
var connection = SqliteConnectionImpl(
upstreamPort: _upstreamPort,
primary: false,
updates: updates,
debugName: name,
mutex: mutex,
readOnly: true,
openFactory: _factory);
_readConnections.add(connection);

// Edge case:
// If we don't await here, there is a chance that a different connection
// is used for the transaction, and that it finishes and deletes the database
// while this one is still opening. This is specifically triggered in tests.
// To avoid that, we wait for the connection to be ready.
await connection.ready;
}
Future<SqliteConnectionImpl> _expandPool() async {
var name = debugName == null
? null
: '$debugName-${_allReadConnections.length + 1}';
var connection = SqliteConnectionImpl(
upstreamPort: _upstreamPort,
primary: false,
updates: updates,
debugName: name,
mutex: mutex,
readOnly: true,
openFactory: _factory);
_allReadConnections.add(connection);

// Edge case:
// If we don't await here, there is a chance that a different connection
// is used for the transaction, and that it finishes and deletes the database
// while this one is still opening. This is specifically triggered in tests.
// To avoid that, we wait for the connection to be ready.
await connection.ready;
return connection;
}

@override
Future<void> close() async {
closed = true;
for (var connection in _readConnections) {

// It is possible that `readLock()` removes connections from the pool while we're
// closing connections, but not possible for new connections to be added.
// Create a copy of the list, to avoid this triggering "Concurrent modification during iteration"
final toClose = _allReadConnections.toList();
for (var connection in toClose) {
// Wait for connection initialization, so that any existing readLock()
// requests go through before closing.
await connection.ready;
await connection.close();
}
// Closing the write connection cleans up the journal files (-shm and -wal files).
Expand All @@ -192,3 +213,34 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
await _writeConnection?.close();
}
}

typedef ReadCallback<T> = Future<T> Function(SqliteReadContext tx);

class _PendingItem {
ReadCallback<dynamic> callback;
Completer<dynamic> completer = Completer.sync();
late Future<dynamic> future = completer.future;
DateTime? deadline;
final Duration? lockTimeout;
late final Timer? lockTimer;

_PendingItem(this.callback, {this.lockTimeout}) {
if (lockTimeout != null) {
deadline = DateTime.now().add(lockTimeout!);
lockTimer = Timer(lockTimeout!, () {
// Note: isCompleted is true when `nextItem.completer.complete` is called, not when the result is available.
// This matches the behavior we need for a timeout on the lock, but not the entire operation.
if (!completer.isCompleted) {
// completer.completeError(
// TimeoutException('Failed to get a read connection', lockTimeout));
completer.complete(Future.sync(() async {
throw TimeoutException(
'Failed to get a read connection', lockTimeout);
}));
}
});
} else {
lockTimer = null;
}
}
}
Loading