From eef16ade3d7ea7fb02dc34ffc90694bdf710ff8e Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 3 Apr 2024 12:04:17 +0200 Subject: [PATCH 01/11] Add failing test. --- test/basic_test.dart | 24 ++++++++++++++++++++++++ test/util.dart | 25 ++++++++++++++++++++++--- 2 files changed, 46 insertions(+), 3 deletions(-) diff --git a/test/basic_test.dart b/test/basic_test.dart index 9b563c8..2e4804a 100644 --- a/test/basic_test.dart +++ b/test/basic_test.dart @@ -368,6 +368,30 @@ void main() { expect(await savedTx!.getAutoCommit(), equals(true)); expect(savedTx!.closed, equals(true)); }); + + test('closing', () async { + // Test race condition in SqliteConnectionPool: + // 1. Open two concurrent queries, which opens two connection. + // 2. Second connection takes longer to open than first. + // 3. Call db.close(). + // 4. Now second connection is ready. Second query has two connections to choose from. + // 5. However, first connection is closed, so it's removed from the pool. + // 6. Triggers `Concurrent modification during iteration: Instance(length:1) of '_GrowableList'` + + final db = + SqliteDatabase.withFactory(testFactory(path: path, initStatements: [ + // Second connection to sleep more than first connection + 'SELECT test_sleep(test_connection_number() * 10)' + ])); + await createTables(db); + + final future1 = db.get('SELECT test_sleep(10) as sleep'); + final future2 = db.get('SELECT test_sleep(10) as sleep'); + final closeFuture = db.close(); + await closeFuture; + await future1; + await future2; + }); }); } diff --git a/test/util.dart b/test/util.dart index 12458fc..5bcbb50 100644 --- a/test/util.dart +++ b/test/util.dart @@ -14,11 +14,13 @@ const defaultSqlitePath = 'libsqlite3.so.0'; class TestSqliteOpenFactory extends DefaultSqliteOpenFactory { String sqlitePath; + List initStatements; TestSqliteOpenFactory( {required super.path, super.sqliteOptions, - this.sqlitePath = defaultSqlitePath}); + this.sqlitePath = defaultSqlitePath, + this.initStatements = const []}); @override sqlite.Database open(SqliteOpenOptions options) { @@ -45,12 +47,29 @@ class TestSqliteOpenFactory extends DefaultSqliteOpenFactory { }, ); + db.createFunction( + functionName: 'test_connection_number', + argumentCount: const sqlite.AllowedArgumentCount(0), + function: (args) { + // write: 0, read: 1 - 5 + final name = Isolate.current.debugName ?? '-0'; + var nr = name.split('-').last; + return int.tryParse(nr) ?? 0; + }, + ); + + for (var s in initStatements) { + db.execute(s); + } + return db; } } -SqliteOpenFactory testFactory({String? path}) { - return TestSqliteOpenFactory(path: path ?? dbPath()); +SqliteOpenFactory testFactory( + {String? path, List initStatements = const []}) { + return TestSqliteOpenFactory( + path: path ?? dbPath(), initStatements: initStatements); } Future setupDatabase({String? path}) async { From 84173344b0e2b3fed0be4553cb2a6c5e8cd778b9 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 3 Apr 2024 12:04:48 +0200 Subject: [PATCH 02/11] Use ClosedException. --- lib/src/connection_pool.dart | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/src/connection_pool.dart b/lib/src/connection_pool.dart index 23b1e0f..5b5316b 100644 --- a/lib/src/connection_pool.dart +++ b/lib/src/connection_pool.dart @@ -53,7 +53,7 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { @override Future getAutoCommit() async { if (_writeConnection == null) { - throw AssertionError('Closed'); + throw ClosedException(); } return await _writeConnection!.getAutoCommit(); } @@ -118,7 +118,7 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { Future writeLock(Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout, String? debugContext}) { if (closed) { - throw AssertionError('Closed'); + throw ClosedException(); } if (_writeConnection?.closed == true) { _writeConnection = null; From a7221567a4427913609c12ffd69528748fba2ce7 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 3 Apr 2024 12:07:10 +0200 Subject: [PATCH 03/11] Fix "Concurrent modification during iteration" errors. --- lib/src/connection_pool.dart | 7 ++++++- test/basic_test.dart | 1 - 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/src/connection_pool.dart b/lib/src/connection_pool.dart index 5b5316b..6a99a59 100644 --- a/lib/src/connection_pool.dart +++ b/lib/src/connection_pool.dart @@ -183,7 +183,12 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { @override Future close() async { closed = true; - for (var connection in _readConnections) { + + // It is possible that `readLock()` removes connections from the pool while we're + // closing connections, but not possible for new connections to be added. + // Create a copy of the list, to avoid this triggering "Concurrent modification during iteration" + final toClose = _readConnections.sublist(0); + for (var connection in toClose) { await connection.close(); } // Closing the write connection cleans up the journal files (-shm and -wal files). diff --git a/test/basic_test.dart b/test/basic_test.dart index 2e4804a..4bc9a42 100644 --- a/test/basic_test.dart +++ b/test/basic_test.dart @@ -383,7 +383,6 @@ void main() { // Second connection to sleep more than first connection 'SELECT test_sleep(test_connection_number() * 10)' ])); - await createTables(db); final future1 = db.get('SELECT test_sleep(10) as sleep'); final future2 = db.get('SELECT test_sleep(10) as sleep'); From dea85050b910e2c7629151370e8845f648095b38 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 3 Apr 2024 12:39:29 +0200 Subject: [PATCH 04/11] Handle ClosedException in SqliteConnectionPool. --- lib/src/connection_pool.dart | 2 ++ test/basic_test.dart | 1 + 2 files changed, 3 insertions(+) diff --git a/lib/src/connection_pool.dart b/lib/src/connection_pool.dart index 6a99a59..f6ae300 100644 --- a/lib/src/connection_pool.dart +++ b/lib/src/connection_pool.dart @@ -94,6 +94,8 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { }, lockTimeout: lockTimeout, debugContext: debugContext); } on TimeoutException { return false; + } on ClosedException { + return false; } }); diff --git a/test/basic_test.dart b/test/basic_test.dart index 4bc9a42..8ed43dc 100644 --- a/test/basic_test.dart +++ b/test/basic_test.dart @@ -383,6 +383,7 @@ void main() { // Second connection to sleep more than first connection 'SELECT test_sleep(test_connection_number() * 10)' ])); + await db.initialize(); final future1 = db.get('SELECT test_sleep(10) as sleep'); final future2 = db.get('SELECT test_sleep(10) as sleep'); From dd2bce05cca0951ae53f55b9a33e75a01417ffa5 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 3 Apr 2024 12:41:56 +0200 Subject: [PATCH 05/11] Fix race condition in closing. --- lib/src/connection_pool.dart | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/src/connection_pool.dart b/lib/src/connection_pool.dart index f6ae300..7852b8e 100644 --- a/lib/src/connection_pool.dart +++ b/lib/src/connection_pool.dart @@ -191,6 +191,9 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { // Create a copy of the list, to avoid this triggering "Concurrent modification during iteration" final toClose = _readConnections.sublist(0); for (var connection in toClose) { + // Wait for connection initialization, so that any existing readLock() + // requests go through before closing. + await connection.ready; await connection.close(); } // Closing the write connection cleans up the journal files (-shm and -wal files). From 5cf85db6ebd9fafa0c3c2780f99c37cc0cd27b0c Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 3 Apr 2024 13:10:01 +0200 Subject: [PATCH 06/11] Slightly better handling of connection initialization errors. --- lib/src/port_channel.dart | 24 +++++++++++++++++++----- lib/src/sqlite_connection_impl.dart | 4 ++++ test/basic_test.dart | 6 +++--- 3 files changed, 26 insertions(+), 8 deletions(-) diff --git a/lib/src/port_channel.dart b/lib/src/port_channel.dart index c32b7b9..9701023 100644 --- a/lib/src/port_channel.dart +++ b/lib/src/port_channel.dart @@ -18,7 +18,8 @@ abstract class PortClient { class ParentPortClient implements PortClient { late Future sendPortFuture; SendPort? sendPort; - ReceivePort receivePort = ReceivePort(); + final ReceivePort _receivePort = ReceivePort(); + final ReceivePort _errorPort = ReceivePort(); bool closed = false; int _nextId = 1; @@ -30,7 +31,7 @@ class ParentPortClient implements PortClient { sendPortFuture.then((value) { sendPort = value; }); - receivePort.listen((message) { + _receivePort.listen((message) { if (message is _InitMessage) { assert(!initCompleter.isCompleted); initCompleter.complete(message.port); @@ -57,6 +58,17 @@ class ParentPortClient implements PortClient { } close(); }); + _errorPort.listen((message) { + var [error, stackTrace] = message; + print('got an error ${initCompleter.isCompleted} $error'); + if (!initCompleter.isCompleted) { + if (stackTrace == null) { + initCompleter.completeError(error); + } else { + initCompleter.completeError(error, StackTrace.fromString(stackTrace)); + } + } + }); } Future get ready async { @@ -94,20 +106,22 @@ class ParentPortClient implements PortClient { } RequestPortServer server() { - return RequestPortServer(receivePort.sendPort); + return RequestPortServer(_receivePort.sendPort); } void close() async { if (!closed) { closed = true; - receivePort.close(); + _receivePort.close(); + _errorPort.close(); _cancelAll(const ClosedException()); } } tieToIsolate(Isolate isolate) { - isolate.addOnExitListener(receivePort.sendPort, response: _closeMessage); + isolate.addErrorListener(_errorPort.sendPort); + isolate.addOnExitListener(_receivePort.sendPort, response: _closeMessage); } } diff --git a/lib/src/sqlite_connection_impl.dart b/lib/src/sqlite_connection_impl.dart index 643152e..7057a73 100644 --- a/lib/src/sqlite_connection_impl.dart +++ b/lib/src/sqlite_connection_impl.dart @@ -88,6 +88,9 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection { @override Future close() async { await _connectionMutex.lock(() async { + if (closed) { + return; + } if (readOnly) { await _isolateClient.post(const _SqliteIsolateConnectionClose()); } else { @@ -97,6 +100,7 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection { await _isolateClient.post(const _SqliteIsolateConnectionClose()); }); } + _isolateClient.close(); _isolate.kill(); }); } diff --git a/test/basic_test.dart b/test/basic_test.dart index 8ed43dc..113ccb2 100644 --- a/test/basic_test.dart +++ b/test/basic_test.dart @@ -377,7 +377,6 @@ void main() { // 4. Now second connection is ready. Second query has two connections to choose from. // 5. However, first connection is closed, so it's removed from the pool. // 6. Triggers `Concurrent modification during iteration: Instance(length:1) of '_GrowableList'` - final db = SqliteDatabase.withFactory(testFactory(path: path, initStatements: [ // Second connection to sleep more than first connection @@ -387,8 +386,9 @@ void main() { final future1 = db.get('SELECT test_sleep(10) as sleep'); final future2 = db.get('SELECT test_sleep(10) as sleep'); - final closeFuture = db.close(); - await closeFuture; + + await db.close(); + await future1; await future2; }); From 3b79bce346cc9dfe800f2f9b6cd5eee0d92c7073 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 3 Apr 2024 13:14:15 +0200 Subject: [PATCH 07/11] Cleanup. --- lib/src/port_channel.dart | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/src/port_channel.dart b/lib/src/port_channel.dart index 9701023..4b952bf 100644 --- a/lib/src/port_channel.dart +++ b/lib/src/port_channel.dart @@ -60,7 +60,6 @@ class ParentPortClient implements PortClient { }); _errorPort.listen((message) { var [error, stackTrace] = message; - print('got an error ${initCompleter.isCompleted} $error'); if (!initCompleter.isCompleted) { if (stackTrace == null) { initCompleter.completeError(error); From 1af625b2d6c6ad6e9cce811644253eccb6b312c5 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 3 Apr 2024 13:40:12 +0200 Subject: [PATCH 08/11] Improve handling of uncaught errors in Isolates. --- lib/src/connection_pool.dart | 3 +- lib/src/port_channel.dart | 58 ++++++++++++++++++++++++++++-------- test/basic_test.dart | 15 +++++++--- 3 files changed, 59 insertions(+), 17 deletions(-) diff --git a/lib/src/connection_pool.dart b/lib/src/connection_pool.dart index 7852b8e..d0c8912 100644 --- a/lib/src/connection_pool.dart +++ b/lib/src/connection_pool.dart @@ -158,7 +158,8 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { if (closed || _readConnections.length >= maxReaders) { return; } - bool hasCapacity = _readConnections.any((connection) => !connection.locked); + bool hasCapacity = _readConnections + .any((connection) => !connection.locked && !connection.closed); if (!hasCapacity) { var name = debugName == null ? null diff --git a/lib/src/port_channel.dart b/lib/src/port_channel.dart index 4b952bf..8b05feb 100644 --- a/lib/src/port_channel.dart +++ b/lib/src/port_channel.dart @@ -21,6 +21,8 @@ class ParentPortClient implements PortClient { final ReceivePort _receivePort = ReceivePort(); final ReceivePort _errorPort = ReceivePort(); bool closed = false; + Object? _closeError; + String? _isolateDebugName; int _nextId = 1; Map> handlers = HashMap(); @@ -59,14 +61,15 @@ class ParentPortClient implements PortClient { close(); }); _errorPort.listen((message) { - var [error, stackTrace] = message; + final [error, stackTraceString] = message; + final stackTrace = stackTraceString == null + ? null + : StackTrace.fromString(stackTraceString); if (!initCompleter.isCompleted) { - if (stackTrace == null) { - initCompleter.completeError(error); - } else { - initCompleter.completeError(error, StackTrace.fromString(stackTrace)); - } + initCompleter.completeError(error, stackTrace); } + _close(IsolateError(cause: error, isolateDebugName: _isolateDebugName), + stackTrace); }); } @@ -74,18 +77,18 @@ class ParentPortClient implements PortClient { await sendPortFuture; } - void _cancelAll(Object error) { + void _cancelAll(Object error, [StackTrace? stackTrace]) { var handlers = this.handlers; this.handlers = {}; for (var message in handlers.values) { - message.completeError(error); + message.completeError(error, stackTrace); } } @override Future post(Object message) async { if (closed) { - throw ClosedException(); + throw _closeError ?? const ClosedException(); } var completer = Completer.sync(); var id = _nextId++; @@ -98,7 +101,7 @@ class ParentPortClient implements PortClient { @override void fire(Object message) async { if (closed) { - throw ClosedException(); + throw _closeError ?? ClosedException(); } final port = sendPort ?? await sendPortFuture; port.send(_FireMessage(message)); @@ -108,17 +111,27 @@ class ParentPortClient implements PortClient { return RequestPortServer(_receivePort.sendPort); } - void close() async { + void _close([Object? error, StackTrace? stackTrace]) { if (!closed) { closed = true; _receivePort.close(); _errorPort.close(); - _cancelAll(const ClosedException()); + if (error == null) { + _cancelAll(const ClosedException()); + } else { + _closeError = error; + _cancelAll(error, stackTrace); + } } } + void close() { + _close(); + } + tieToIsolate(Isolate isolate) { + _isolateDebugName = isolate.debugName; isolate.addErrorListener(_errorPort.sendPort); isolate.addOnExitListener(_receivePort.sendPort, response: _closeMessage); } @@ -274,6 +287,27 @@ class _RequestMessage { class ClosedException implements Exception { const ClosedException(); + + @override + String toString() { + return 'ClosedException'; + } +} + +class IsolateError extends Error { + final Object cause; + final String? isolateDebugName; + + IsolateError({required this.cause, this.isolateDebugName}); + + @override + String toString() { + if (isolateDebugName != null) { + return 'IsolateError in $isolateDebugName: $cause'; + } else { + return 'IsolateError: $cause'; + } + } } class _PortChannelResult { diff --git a/test/basic_test.dart b/test/basic_test.dart index 113ccb2..c3ad150 100644 --- a/test/basic_test.dart +++ b/test/basic_test.dart @@ -4,6 +4,7 @@ import 'dart:math'; import 'package:sqlite3/sqlite3.dart' as sqlite; import 'package:sqlite_async/mutex.dart'; import 'package:sqlite_async/sqlite_async.dart'; +import 'package:test/expect.dart'; import 'package:test/test.dart'; import 'util.dart'; @@ -301,8 +302,11 @@ void main() { }).catchError((error) { caughtError = error; }); - // This may change into a better error in the future - expect(caughtError.toString(), equals("Instance of 'ClosedException'")); + // The specific error message may change + expect( + caughtError.toString(), + equals( + "IsolateError in sqlite-writer: Invalid argument(s): uncaught async error")); // Check that we can still continue afterwards final computed = await db.computeWithDatabase((db) async { @@ -328,8 +332,11 @@ void main() { }).catchError((error) { caughtError = error; }); - // This may change into a better error in the future - expect(caughtError.toString(), equals("Instance of 'ClosedException'")); + // The specific message may change + expect( + caughtError.toString(), + matches(RegExp( + r'IsolateError in sqlite-\d+: Invalid argument\(s\): uncaught async error'))); } // Check that we can still continue afterwards From c01e637ba3a274c8a9031b76114de1a3c3ddf728 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 3 Apr 2024 15:59:47 +0200 Subject: [PATCH 09/11] Rewrite connection pooling queue. --- lib/src/connection_pool.dart | 173 ++++++++++++++++++----------------- 1 file changed, 90 insertions(+), 83 deletions(-) diff --git a/lib/src/connection_pool.dart b/lib/src/connection_pool.dart index d0c8912..57da980 100644 --- a/lib/src/connection_pool.dart +++ b/lib/src/connection_pool.dart @@ -1,4 +1,5 @@ import 'dart:async'; +import 'dart:collection'; import 'mutex.dart'; import 'port_channel.dart'; @@ -12,7 +13,9 @@ import 'update_notification.dart'; class SqliteConnectionPool with SqliteQueries implements SqliteConnection { SqliteConnection? _writeConnection; - final List _readConnections = []; + final Set _allReadConnections = {}; + final Queue _availableReadConnections = Queue(); + final Queue<_PendingItem> _queue = Queue(); final SqliteOpenFactory _factory; final SerializedPortClient _upstreamPort; @@ -58,62 +61,60 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { return await _writeConnection!.getAutoCommit(); } - @override - Future readLock(Future Function(SqliteReadContext tx) callback, - {Duration? lockTimeout, String? debugContext}) async { - await _expandPool(); - - return _runZoned(() async { - bool haveLock = false; - var completer = Completer(); - - var futures = _readConnections.sublist(0).map((connection) async { - if (connection.closed) { - _readConnections.remove(connection); - } - try { - return await connection.readLock((ctx) async { - if (haveLock) { - // Already have a different lock - release this one. - return false; - } - haveLock = true; - - var future = callback(ctx); - completer.complete(future); - - // We have to wait for the future to complete before we can release the - // lock. - try { - await future; - } catch (_) { - // Ignore - } - - return true; - }, lockTimeout: lockTimeout, debugContext: debugContext); - } on TimeoutException { - return false; - } on ClosedException { - return false; - } - }); - - final stream = Stream.fromFutures(futures); - var gotAny = await stream.any((element) => element); - - if (!gotAny) { - // All TimeoutExceptions - throw TimeoutException('Failed to get a read connection', lockTimeout); + void _nextRead() { + if (_queue.isEmpty) { + // Wait for queue item + return; + } else if (closed) { + while (_queue.isNotEmpty) { + final nextItem = _queue.removeFirst(); + nextItem.completer.completeError(const ClosedException()); } + return; + } + + while (_availableReadConnections.isNotEmpty && + _availableReadConnections.last.closed) { + // Remove connections that may have errored + final connection = _availableReadConnections.removeLast(); + _allReadConnections.remove(connection); + } + + if (_availableReadConnections.isEmpty && + _allReadConnections.length == maxReaders) { + // Wait for available connection + return; + } + final nextItem = _queue.removeFirst(); + nextItem.completer.complete(Future.sync(() async { + final nextConnection = _availableReadConnections.isEmpty + ? await _expandPool() + : _availableReadConnections.removeLast(); try { - return await completer.future; - } catch (e) { - // throw e; - rethrow; + final result = await nextConnection.readLock(nextItem.callback); + return result; + } finally { + _availableReadConnections.add(nextConnection); + _nextRead(); } - }, debugContext: debugContext ?? 'get*()'); + })); + } + + @override + Future readLock(ReadCallback callback, + {Duration? lockTimeout, String? debugContext}) async { + if (closed) { + throw ClosedException(); + } + final zone = _getZone(debugContext: debugContext ?? 'get*()'); + final item = _PendingItem((ctx) { + return zone.runUnary(callback, ctx); + }); + _queue.add(item); + _nextRead(); + + return await item.completer.future; } @override @@ -146,41 +147,38 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { /// connection (with a different lock). /// 2. Give a more specific error message when it happens. T _runZoned(T Function() callback, {required String debugContext}) { + return _getZone(debugContext: debugContext).run(callback); + } + + Zone _getZone({required String debugContext}) { if (Zone.current[this] != null) { throw LockError( 'Recursive lock is not allowed. Use `tx.$debugContext` instead of `db.$debugContext`.'); } - var zone = Zone.current.fork(zoneValues: {this: true}); - return zone.run(callback); + return Zone.current.fork(zoneValues: {this: true}); } - Future _expandPool() async { - if (closed || _readConnections.length >= maxReaders) { - return; - } - bool hasCapacity = _readConnections - .any((connection) => !connection.locked && !connection.closed); - if (!hasCapacity) { - var name = debugName == null - ? null - : '$debugName-${_readConnections.length + 1}'; - var connection = SqliteConnectionImpl( - upstreamPort: _upstreamPort, - primary: false, - updates: updates, - debugName: name, - mutex: mutex, - readOnly: true, - openFactory: _factory); - _readConnections.add(connection); - - // Edge case: - // If we don't await here, there is a chance that a different connection - // is used for the transaction, and that it finishes and deletes the database - // while this one is still opening. This is specifically triggered in tests. - // To avoid that, we wait for the connection to be ready. - await connection.ready; - } + Future _expandPool() async { + var name = debugName == null + ? null + : '$debugName-${_allReadConnections.length + 1}'; + var connection = SqliteConnectionImpl( + upstreamPort: _upstreamPort, + primary: false, + updates: updates, + debugName: name, + mutex: mutex, + readOnly: true, + openFactory: _factory); + _allReadConnections.add(connection); + + // Edge case: + // If we don't await here, there is a chance that a different connection + // is used for the transaction, and that it finishes and deletes the database + // while this one is still opening. This is specifically triggered in tests. + // To avoid that, we wait for the connection to be ready. + await connection.ready; + return connection; } @override @@ -190,7 +188,7 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { // It is possible that `readLock()` removes connections from the pool while we're // closing connections, but not possible for new connections to be added. // Create a copy of the list, to avoid this triggering "Concurrent modification during iteration" - final toClose = _readConnections.sublist(0); + final toClose = _allReadConnections.toList(); for (var connection in toClose) { // Wait for connection initialization, so that any existing readLock() // requests go through before closing. @@ -203,3 +201,12 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { await _writeConnection?.close(); } } + +typedef ReadCallback = Future Function(SqliteReadContext tx); + +class _PendingItem { + ReadCallback callback; + Completer completer = Completer.sync(); + + _PendingItem(this.callback); +} From 726197ffab4be8d280627ef3452427cbb3f392a1 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 3 Apr 2024 17:17:06 +0200 Subject: [PATCH 10/11] Re-implement lockTimeout. --- lib/src/connection_pool.dart | 46 +++++++++++++++++++++++++++++++----- test/basic_test.dart | 24 +++++++++++++++++++ 2 files changed, 64 insertions(+), 6 deletions(-) diff --git a/lib/src/connection_pool.dart b/lib/src/connection_pool.dart index 57da980..3a08dbb 100644 --- a/lib/src/connection_pool.dart +++ b/lib/src/connection_pool.dart @@ -86,17 +86,29 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { return; } - final nextItem = _queue.removeFirst(); + var nextItem = _queue.removeFirst(); + while (nextItem.completer.isCompleted) { + // This item already timed out - try the next one if available + if (_queue.isEmpty) { + return; + } + nextItem = _queue.removeFirst(); + } + + nextItem.lockTimer?.cancel(); + nextItem.completer.complete(Future.sync(() async { final nextConnection = _availableReadConnections.isEmpty ? await _expandPool() : _availableReadConnections.removeLast(); try { + // At this point the connection is expected to be available immediately. + // No need to calculate a new lockTimeout here. final result = await nextConnection.readLock(nextItem.callback); return result; } finally { _availableReadConnections.add(nextConnection); - _nextRead(); + Timer.run(_nextRead); } })); } @@ -110,11 +122,11 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { final zone = _getZone(debugContext: debugContext ?? 'get*()'); final item = _PendingItem((ctx) { return zone.runUnary(callback, ctx); - }); + }, lockTimeout: lockTimeout); _queue.add(item); _nextRead(); - return await item.completer.future; + return (await item.future) as T; } @override @@ -207,6 +219,28 @@ typedef ReadCallback = Future Function(SqliteReadContext tx); class _PendingItem { ReadCallback callback; Completer completer = Completer.sync(); - - _PendingItem(this.callback); + late Future future = completer.future; + DateTime? deadline; + final Duration? lockTimeout; + late final Timer? lockTimer; + + _PendingItem(this.callback, {this.lockTimeout}) { + if (lockTimeout != null) { + deadline = DateTime.now().add(lockTimeout!); + lockTimer = Timer(lockTimeout!, () { + // Note: isCompleted is true when `nextItem.completer.complete` is called, not when the result is available. + // This matches the behavior we need for a timeout on the lock, but not the entire operation. + if (!completer.isCompleted) { + // completer.completeError( + // TimeoutException('Failed to get a read connection', lockTimeout)); + completer.complete(Future.sync(() async { + throw TimeoutException( + 'Failed to get a read connection', lockTimeout); + })); + } + }); + } else { + lockTimer = null; + } + } } diff --git a/test/basic_test.dart b/test/basic_test.dart index c3ad150..3d561a3 100644 --- a/test/basic_test.dart +++ b/test/basic_test.dart @@ -1,5 +1,6 @@ import 'dart:async'; import 'dart:math'; +import 'dart:typed_data'; import 'package:sqlite3/sqlite3.dart' as sqlite; import 'package:sqlite_async/mutex.dart'; @@ -399,6 +400,29 @@ void main() { await future1; await future2; }); + + test('lockTimeout', () async { + final db = + SqliteDatabase.withFactory(testFactory(path: path), maxReaders: 2); + await db.initialize(); + + final f1 = db.readTransaction((tx) async { + await tx.get('select test_sleep(100)'); + }, lockTimeout: const Duration(milliseconds: 200)); + + final f2 = db.readTransaction((tx) async { + await tx.get('select test_sleep(100)'); + }, lockTimeout: const Duration(milliseconds: 200)); + + // At this point, both read connections are in use + await expectLater(() async { + await db.readLock((tx) async { + await tx.get('select test_sleep(10)'); + }, lockTimeout: const Duration(milliseconds: 2)); + }, throwsA((e) => e is TimeoutException)); + + await Future.wait([f1, f2]); + }); }); } From 275b38dc81d870c89e238e06d1ac662edc169b0a Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 3 Apr 2024 17:19:50 +0200 Subject: [PATCH 11/11] Fix imports. --- test/basic_test.dart | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/basic_test.dart b/test/basic_test.dart index 3d561a3..f0713e4 100644 --- a/test/basic_test.dart +++ b/test/basic_test.dart @@ -1,11 +1,9 @@ import 'dart:async'; import 'dart:math'; -import 'dart:typed_data'; import 'package:sqlite3/sqlite3.dart' as sqlite; import 'package:sqlite_async/mutex.dart'; import 'package:sqlite_async/sqlite_async.dart'; -import 'package:test/expect.dart'; import 'package:test/test.dart'; import 'util.dart';