Skip to content

v0.5.1: Close write connection after read connections; Fix watch with query parameters #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.5.1

- Fix `watch` when called with query parameters.
- Clean up `-wal` and `-shm` files on close.

## 0.5.0

- No code changes.
Expand Down
5 changes: 4 additions & 1 deletion lib/src/connection_pool.dart
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,12 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
@override
Future<void> close() async {
closed = true;
await _writeConnection?.close();
for (var connection in _readConnections) {
await connection.close();
}
// Closing the write connection cleans up the journal files (-shm and -wal files).
// It can only do that if there are no other open connections, so we close the
// read-only connections first.
await _writeConnection?.close();
}
}
5 changes: 3 additions & 2 deletions lib/src/database_utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ Future<Set<String>> getSourceTablesText(
}

/// Given a SELECT query, return the tables that the query depends on.
Future<Set<String>> getSourceTables(SqliteReadContext ctx, String sql) async {
final rows = await ctx.getAll('EXPLAIN $sql');
Future<Set<String>> getSourceTables(SqliteReadContext ctx, String sql,
[List<Object?> parameters = const []]) async {
final rows = await ctx.getAll('EXPLAIN $sql', parameters);
List<int> rootpages = [];
for (var row in rows) {
if (row['opcode'] == 'OpenRead' && row['p3'] == 0 && row['p2'] is int) {
Expand Down
3 changes: 2 additions & 1 deletion lib/src/sqlite_queries.dart
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ mixin SqliteQueries implements SqliteWriteContext, SqliteConnection {
Iterable<String>? triggerOnTables}) async* {
assert(updates != null,
'updates stream must be provided to allow query watching');
final tables = triggerOnTables ?? await getSourceTables(this, sql);
final tables =
triggerOnTables ?? await getSourceTables(this, sql, parameters);
final filteredStream =
updates!.transform(UpdateNotification.filterTablesTransformer(tables));
final throttledStream = UpdateNotification.throttleStream(
Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: sqlite_async
description: High-performance asynchronous interface for SQLite on Dart and Flutter.
version: 0.5.0
version: 0.5.1
repository: https://github.com/journeyapps/sqlite_async.dart
environment:
sdk: '>=2.19.1 <4.0.0'
Expand Down
53 changes: 53 additions & 0 deletions test/close_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import 'dart:io';

import 'package:sqlite_async/sqlite_async.dart';
import 'package:test/test.dart';

import 'util.dart';

void main() {
group('Close Tests', () {
late String path;

setUp(() async {
path = dbPath();
await cleanDb(path: path);
});

tearDown(() async {
await cleanDb(path: path);
});

createTables(SqliteDatabase db) async {
await db.writeTransaction((tx) async {
await tx.execute(
'CREATE TABLE test_data(id INTEGER PRIMARY KEY AUTOINCREMENT, description TEXT)');
});
}

test('Open and close', () async {
// Test that the journal files are properly deleted after closing.
// If the write connection is closed before the read connections, that is
// not the case.

final db = await setupDatabase(path: path);
await createTables(db);

await db.execute(
'INSERT INTO test_data(description) VALUES(?)', ['Test Data']);
await db.getAll('SELECT * FROM test_data');

expect(await File('$path-wal').exists(), equals(true));
expect(await File('$path-shm').exists(), equals(true));

await db.close();

expect(await File(path).exists(), equals(true));

expect(await File('$path-wal').exists(), equals(false));
expect(await File('$path-shm').exists(), equals(false));

expect(await File('$path-journal').exists(), equals(false));
});
});
}
66 changes: 66 additions & 0 deletions test/watch_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,72 @@ void main() {
lastTime = r;
}
});

test('watch with parameters', () async {
final db = await setupDatabase(path: path);
await createTables(db);

const baseTime = 20;

const throttleDuration = Duration(milliseconds: baseTime);

final rows = await db.execute(
'INSERT INTO customers(name) VALUES (?) RETURNING id',
['a customer']);
final id = rows[0]['id'];

final stream = db.watch(
'SELECT count() AS count FROM assets WHERE customer_id = ?',
parameters: [id],
throttle: throttleDuration);

var done = false;
inserts() async {
while (!done) {
await db.execute(
'INSERT INTO assets(make, customer_id) VALUES (?, ?)',
['test', id]);
await Future.delayed(
Duration(milliseconds: Random().nextInt(baseTime * 2)));
}
}

const numberOfQueries = 10;

inserts();
try {
List<DateTime> times = [];
final results = await stream.take(numberOfQueries).map((e) {
times.add(DateTime.now());
return e;
}).toList();

var lastCount = 0;
for (var r in results) {
final count = r.first['count'];
// This is not strictly incrementing, since we can't guarantee the
// exact order between reads and writes.
// We can guarantee that there will always be a read after the last write,
// but the previous read may have been after the same write in some cases.
expect(count, greaterThanOrEqualTo(lastCount));
lastCount = count;
}

// The number of read queries must not be greater than the number of writes overall.
expect(numberOfQueries, lessThanOrEqualTo(results.last.first['count']));

DateTime? lastTime;
for (var r in times) {
if (lastTime != null) {
var diff = r.difference(lastTime);
expect(diff, greaterThanOrEqualTo(throttleDuration));
}
lastTime = r;
}
} finally {
done = true;
}
});
});
}

Expand Down