Skip to content

Commit 41174b1

Browse files
Use SQLite Session API for PowerSync updates.
1 parent 6f0e630 commit 41174b1

File tree

5 files changed

+85
-107
lines changed

5 files changed

+85
-107
lines changed

Demo/PowerSyncExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved

Lines changed: 11 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Sources/PowerSync/Kotlin/KotlinSQLiteConnectionPool.swift

Lines changed: 33 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ final class SwiftSQLiteConnectionPoolAdapter: PowerSyncKotlin.SwiftPoolAdapter {
2020
self.pool = pool
2121
}
2222

23-
func linkUpdates(callback: any KotlinSuspendFunction1) {
23+
func linkExternalUpdates(callback: any KotlinSuspendFunction1) {
2424
updateTrackingTask = Task {
2525
do {
2626
for try await updates in pool.tableUpdates {
@@ -32,93 +32,72 @@ final class SwiftSQLiteConnectionPoolAdapter: PowerSyncKotlin.SwiftPoolAdapter {
3232
}
3333
}
3434

35-
func __closePool() async throws {
36-
do {
35+
func __processPowerSyncUpdates(updates: Set<String>) async throws {
36+
return try await wrapExceptions {
37+
try await pool.processPowerSyncUpdates(updates)
38+
}
39+
}
40+
41+
func __dispose() async throws {
42+
return try await wrapExceptions {
3743
updateTrackingTask?.cancel()
3844
updateTrackingTask = nil
39-
try pool.close()
40-
} catch {
41-
try? PowerSyncKotlin.throwPowerSyncException(
42-
exception: PowerSyncException(
43-
message: error.localizedDescription,
44-
cause: nil
45-
)
46-
)
4745
}
4846
}
4947

5048
func __leaseRead(callback: any LeaseCallback) async throws {
51-
do {
52-
var errorToThrow: Error?
49+
return try await wrapExceptions {
5350
try await pool.read { lease in
54-
do {
55-
try callback.execute(
56-
lease: KotlinLeaseAdapter(
57-
lease: lease
58-
)
51+
try callback.execute(
52+
lease: KotlinLeaseAdapter(
53+
lease: lease
5954
)
60-
} catch {
61-
errorToThrow = error
62-
}
63-
}
64-
if let errorToThrow {
65-
throw errorToThrow
66-
}
67-
} catch {
68-
try? PowerSyncKotlin.throwPowerSyncException(
69-
exception: PowerSyncException(
70-
message: error.localizedDescription,
71-
cause: nil
7255
)
73-
)
56+
}
7457
}
7558
}
7659

7760
func __leaseWrite(callback: any LeaseCallback) async throws {
78-
do {
79-
var errorToThrow: Error?
61+
return try await wrapExceptions {
8062
try await pool.write { lease in
81-
do {
82-
try callback.execute(
83-
lease: KotlinLeaseAdapter(
84-
lease: lease
85-
)
63+
try callback.execute(
64+
lease: KotlinLeaseAdapter(
65+
lease: lease
8666
)
87-
} catch {
88-
errorToThrow = error
89-
}
90-
}
91-
if let errorToThrow {
92-
throw errorToThrow
93-
}
94-
} catch {
95-
try? PowerSyncKotlin.throwPowerSyncException(
96-
exception: PowerSyncException(
97-
message: error.localizedDescription,
98-
cause: nil
9967
)
100-
)
68+
}
10169
}
10270
}
10371

10472
func __leaseAll(callback: any AllLeaseCallback) async throws {
105-
// TODO, actually use all connections
106-
do {
73+
// FIXME, actually use all connections
74+
// We currently only use this for schema updates
75+
return try await wrapExceptions {
10776
try await pool.write { lease in
108-
try? callback.execute(
77+
try callback.execute(
10978
writeLease: KotlinLeaseAdapter(
11079
lease: lease
11180
),
11281
readLeases: []
11382
)
11483
}
84+
}
85+
}
86+
87+
private func wrapExceptions<Result>(
88+
_ callback: () async throws -> Result
89+
) async throws -> Result {
90+
do {
91+
return try await callback()
11592
} catch {
11693
try? PowerSyncKotlin.throwPowerSyncException(
11794
exception: PowerSyncException(
11895
message: error.localizedDescription,
11996
cause: nil
12097
)
12198
)
99+
// Won't reach here
100+
throw error
122101
}
123102
}
124103
}

Sources/PowerSync/Protocol/SQLiteConnectionPool.swift

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import Foundation
22

3+
/// A lease representing a temporarily borrowed SQLite connection from the pool.
34
public protocol SQLiteConnectionLease {
5+
/// Pointer to the underlying SQLite connection.
46
var pointer: OpaquePointer { get }
57
}
68

@@ -9,22 +11,26 @@ public protocol SQLiteConnectionLease {
911
public protocol SQLiteConnectionPoolProtocol {
1012
var tableUpdates: AsyncStream<Set<String>> { get }
1113

14+
/// Processes updates from PowerSync, notifying any active leases of changes
15+
/// (made by PowerSync) to tracked tables.
16+
func processPowerSyncUpdates(_ updates: Set<String>) async throws
17+
1218
/// Calls the callback with a read-only connection temporarily leased from the pool.
1319
func read(
14-
onConnection: @Sendable @escaping (SQLiteConnectionLease) -> Void,
20+
onConnection: @Sendable @escaping (SQLiteConnectionLease) throws -> Void,
1521
) async throws
1622

1723
/// Calls the callback with a read-write connection temporarily leased from the pool.
1824
func write(
19-
onConnection: @Sendable @escaping (SQLiteConnectionLease) -> Void,
25+
onConnection: @Sendable @escaping (SQLiteConnectionLease) throws -> Void,
2026
) async throws
2127

2228
/// Invokes the callback with all connections leased from the pool.
2329
func withAllConnections(
2430
onConnection: @Sendable @escaping (
2531
_ writer: SQLiteConnectionLease,
2632
_ readers: [SQLiteConnectionLease]
27-
) -> Void,
33+
) throws -> Void,
2834
) async throws
2935

3036
/// Closes the connection pool and associated resources.

Sources/PowerSyncGRDB/Connections/GRDBConnectionPool.swift

Lines changed: 20 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,25 @@ public final class GRDBConnectionPool: SQLiteConnectionPoolProtocol {
3737
tableUpdatesContinuation = tempContinuation
3838
}
3939

40+
public func processPowerSyncUpdates(_ updates: Set<String>) async throws {
41+
try await pool.write { database in
42+
for table in updates {
43+
try database.notifyChanges(in: Table(table))
44+
if table.hasPrefix("ps_data__") {
45+
let stripped = String(table.dropFirst("ps_data__".count))
46+
try database.notifyChanges(in: Table(stripped))
47+
} else if table.hasPrefix("ps_data_local__") {
48+
let stripped = String(table.dropFirst("ps_data_local__".count))
49+
try database.notifyChanges(in: Table(stripped))
50+
}
51+
}
52+
}
53+
// Pass the updates to the output stream
54+
tableUpdatesContinuation?.yield(updates)
55+
}
56+
4057
public func read(
41-
onConnection: @Sendable @escaping (SQLiteConnectionLease) -> Void
58+
onConnection: @Sendable @escaping (SQLiteConnectionLease) throws -> Void
4259
) async throws {
4360
try await pool.read { database in
4461
try onConnection(
@@ -48,63 +65,18 @@ public final class GRDBConnectionPool: SQLiteConnectionPoolProtocol {
4865
}
4966

5067
public func write(
51-
onConnection: @Sendable @escaping (SQLiteConnectionLease) -> Void
68+
onConnection: @Sendable @escaping (SQLiteConnectionLease) throws -> Void
5269
) async throws {
5370
// Don't start an explicit transaction, we do this internally
54-
let updateBroker = UpdateBroker()
5571
try await pool.writeWithoutTransaction { database in
56-
57-
let brokerPointer = Unmanaged.passUnretained(updateBroker).toOpaque()
58-
59-
/// GRDB only registers an update hook if it detects a requirement for one.
60-
/// It also removes its own update hook if no longer needed.
61-
/// We use the SQLite connection pointer directly, which sidesteps GRDB.
62-
/// We can register our own temporary update hook here.
63-
let previousParamPointer = sqlite3_update_hook(
64-
database.sqliteConnection,
65-
{ brokerPointer, _, _, tableNameCString, _ in
66-
let broker = Unmanaged<UpdateBroker>.fromOpaque(brokerPointer!).takeUnretainedValue()
67-
broker.updates.insert(String(cString: tableNameCString!))
68-
},
69-
brokerPointer
70-
)
71-
72-
// This should not be present
73-
assert(previousParamPointer == nil, "A pre-existing update hook was already registered and has been overwritten.")
74-
75-
defer {
76-
// Deregister our temporary hook
77-
sqlite3_update_hook(database.sqliteConnection, nil, nil)
78-
}
79-
8072
try onConnection(
8173
GRDBConnectionLease(database: database)
8274
)
8375
}
84-
85-
// Notify GRDB consumers of updates
86-
// Seems like we need to do this in a write transaction
87-
try await pool.write { database in
88-
for table in updateBroker.updates {
89-
try database.notifyChanges(in: Table(table))
90-
if table.hasPrefix("ps_data__") {
91-
let stripped = String(table.dropFirst("ps_data__".count))
92-
try database.notifyChanges(in: Table(stripped))
93-
} else if table.hasPrefix("ps_data_local__") {
94-
let stripped = String(table.dropFirst("ps_data_local__".count))
95-
try database.notifyChanges(in: Table(stripped))
96-
}
97-
}
98-
}
99-
guard let pushUpdates = tableUpdatesContinuation else {
100-
return
101-
}
102-
// Notify the PowerSync SDK consumers of updates
103-
pushUpdates.yield(updateBroker.updates)
10476
}
10577

10678
public func withAllConnections(
107-
onConnection _: @escaping (SQLiteConnectionLease, [SQLiteConnectionLease]) -> Void
79+
onConnection _: @escaping (SQLiteConnectionLease, [SQLiteConnectionLease]) throws -> Void
10880
) async throws {
10981
// TODO:
11082
}

Tests/PowerSyncGRDBTests/BasicTest.swift

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,18 @@ final class GRDBTests: XCTestCase {
321321
watchTask.cancel()
322322
}
323323

324+
func testShouldThrowErrorsFromPowerSync() async throws {
325+
do {
326+
try await database.execute(
327+
sql: "INSERT INTO non_existent_table(id, name) VALUES(uuid(), ?)",
328+
parameters: ["one"]
329+
)
330+
XCTFail("Should throw error")
331+
} catch {
332+
XCTAssertTrue(error.localizedDescription.contains("non_existent_table")) // Expected
333+
}
334+
}
335+
324336
func testGRDBUpdatesFromGRDB() async throws {
325337
let expectation = XCTestExpectation(description: "Watch changes")
326338

0 commit comments

Comments
 (0)