1
1
import 'dart:async' ;
2
2
3
- import 'package:sqlite_async/src/common/abstract_open_factory.dart' ;
4
- import 'package:sqlite_async/src/common/mutex.dart' ;
5
- import 'package:sqlite_async/src/common/port_channel.dart' ;
3
+ import 'package:sqlite_async/sqlite_async.dart' ;
6
4
import 'package:sqlite_async/src/native/database/native_sqlite_connection_impl.dart' ;
7
5
import 'package:sqlite_async/src/native/native_isolate_mutex.dart' ;
8
- import 'package:sqlite_async/src/sqlite_connection.dart' ;
9
- import 'package:sqlite_async/src/sqlite_queries.dart' ;
10
- import 'package:sqlite_async/src/update_notification.dart' ;
11
6
12
7
/// A connection pool with a single write connection and multiple read connections.
13
8
class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
14
- SqliteConnection ? _writeConnection;
9
+ final StreamController <UpdateNotification > updatesController =
10
+ StreamController .broadcast ();
11
+
12
+ @override
13
+
14
+ /// The write connection might be recreated if it's closed
15
+ /// This will allow the update stream remain constant even
16
+ /// after using a new write connection.
17
+ late final Stream <UpdateNotification > updates = updatesController.stream;
18
+
19
+ SqliteConnectionImpl ? _writeConnection;
15
20
16
21
final List <SqliteConnectionImpl > _readConnections = [];
17
22
18
23
final AbstractDefaultSqliteOpenFactory _factory;
19
- final SerializedPortClient _upstreamPort;
20
-
21
- @override
22
- final Stream <UpdateNotification >? updates;
23
24
24
25
final int maxReaders;
25
26
@@ -41,14 +42,14 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
41
42
/// Read connections are opened in read-only mode, and will reject any statements
42
43
/// that modify the database.
43
44
SqliteConnectionPool (this ._factory,
44
- {this .updates,
45
- this .maxReaders = 5 ,
46
- SqliteConnection ? writeConnection,
45
+ {this .maxReaders = 5 ,
46
+ SqliteConnectionImpl ? writeConnection,
47
47
this .debugName,
48
- required this .mutex,
49
- required SerializedPortClient upstreamPort})
50
- : _writeConnection = writeConnection,
51
- _upstreamPort = upstreamPort;
48
+ required this .mutex})
49
+ : _writeConnection = writeConnection {
50
+ // Use the write connection's updates
51
+ _writeConnection? .updates? .forEach (updatesController.add);
52
+ }
52
53
53
54
/// Returns true if the _write_ connection is currently in autocommit mode.
54
55
@override
@@ -117,21 +118,24 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
117
118
118
119
@override
119
120
Future <T > writeLock <T >(Future <T > Function (SqliteWriteContext tx) callback,
120
- {Duration ? lockTimeout, String ? debugContext}) {
121
+ {Duration ? lockTimeout, String ? debugContext}) async {
121
122
if (closed) {
122
123
throw AssertionError ('Closed' );
123
124
}
124
125
if (_writeConnection? .closed == true ) {
125
126
_writeConnection = null ;
126
127
}
127
- _writeConnection ?? = SqliteConnectionImpl (
128
- upstreamPort: _upstreamPort,
129
- primary: false ,
130
- updates: updates,
131
- debugName: debugName != null ? '$debugName -writer' : null ,
132
- mutex: mutex,
133
- readOnly: false ,
134
- openFactory: _factory);
128
+
129
+ if (_writeConnection == null ) {
130
+ _writeConnection = (await _factory.openConnection (SqliteOpenOptions (
131
+ primaryConnection: true ,
132
+ debugName: debugName != null ? '$debugName -writer' : null ,
133
+ mutex: mutex,
134
+ readOnly: false ))) as SqliteConnectionImpl ;
135
+ // Expose the new updates on the connection pool
136
+ _writeConnection! .updates? .forEach (updatesController.add);
137
+ }
138
+
135
139
return _runZoned (() {
136
140
return _writeConnection! .writeLock (callback,
137
141
lockTimeout: lockTimeout, debugContext: debugContext);
@@ -163,7 +167,7 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
163
167
? null
164
168
: '$debugName -${_readConnections .length + 1 }' ;
165
169
var connection = SqliteConnectionImpl (
166
- upstreamPort: _upstreamPort ,
170
+ upstreamPort: _writeConnection ? .upstreamPort ,
167
171
primary: false ,
168
172
updates: updates,
169
173
debugName: name,
@@ -181,6 +185,10 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
181
185
}
182
186
}
183
187
188
+ SerializedPortClient ? get upstreamPort {
189
+ return _writeConnection? .upstreamPort;
190
+ }
191
+
184
192
@override
185
193
Future <void > close () async {
186
194
closed = true ;
0 commit comments