Skip to content

Commit 6f0e630

Browse files
Table updates from PowerSync side
1 parent 4002d85 commit 6f0e630

File tree

13 files changed

+369
-193
lines changed

13 files changed

+369
-193
lines changed

Demo/GRDB Demo/GRDB Demo/GRDB_DemoApp.swift

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,7 @@ func openDatabase()
3333
.appendingPathComponent("test.sqlite")
3434

3535
var config = Configuration()
36-
37-
configurePowerSync(
38-
config: &config,
36+
config.configurePowerSync(
3937
schema: schema
4038
)
4139

Sources/PowerSync/Kotlin/KotlinSQLiteConnectionPool.swift

Lines changed: 61 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,41 @@
11
import PowerSyncKotlin
22

3+
class KotlinLeaseAdapter: PowerSyncKotlin.SwiftLeaseAdapter {
4+
let pointer: UnsafeMutableRawPointer
5+
6+
init(
7+
lease: SQLiteConnectionLease
8+
) {
9+
pointer = UnsafeMutableRawPointer(lease.pointer)
10+
}
11+
}
12+
313
final class SwiftSQLiteConnectionPoolAdapter: PowerSyncKotlin.SwiftPoolAdapter {
414
let pool: SQLiteConnectionPoolProtocol
15+
var updateTrackingTask: Task<Void, Never>?
516

617
init(
718
pool: SQLiteConnectionPoolProtocol
819
) {
920
self.pool = pool
1021
}
1122

12-
func getPendingUpdates() -> Set<String> {
13-
return pool.getPendingUpdates()
23+
func linkUpdates(callback: any KotlinSuspendFunction1) {
24+
updateTrackingTask = Task {
25+
do {
26+
for try await updates in pool.tableUpdates {
27+
_ = try await callback.invoke(p1: updates)
28+
}
29+
} catch {
30+
// none of these calls should actually throw
31+
}
32+
}
1433
}
1534

1635
func __closePool() async throws {
1736
do {
37+
updateTrackingTask?.cancel()
38+
updateTrackingTask = nil
1839
try pool.close()
1940
} catch {
2041
try? PowerSyncKotlin.throwPowerSyncException(
@@ -26,10 +47,22 @@ final class SwiftSQLiteConnectionPoolAdapter: PowerSyncKotlin.SwiftPoolAdapter {
2647
}
2748
}
2849

29-
func __leaseRead(callback: @escaping (Any) -> Void) async throws {
50+
func __leaseRead(callback: any LeaseCallback) async throws {
3051
do {
31-
try await pool.read { pointer in
32-
callback(UInt(bitPattern: pointer))
52+
var errorToThrow: Error?
53+
try await pool.read { lease in
54+
do {
55+
try callback.execute(
56+
lease: KotlinLeaseAdapter(
57+
lease: lease
58+
)
59+
)
60+
} catch {
61+
errorToThrow = error
62+
}
63+
}
64+
if let errorToThrow {
65+
throw errorToThrow
3366
}
3467
} catch {
3568
try? PowerSyncKotlin.throwPowerSyncException(
@@ -41,10 +74,22 @@ final class SwiftSQLiteConnectionPoolAdapter: PowerSyncKotlin.SwiftPoolAdapter {
4174
}
4275
}
4376

44-
func __leaseWrite(callback: @escaping (Any) -> Void) async throws {
77+
func __leaseWrite(callback: any LeaseCallback) async throws {
4578
do {
46-
try await pool.write { pointer in
47-
callback(UInt(bitPattern: pointer))
79+
var errorToThrow: Error?
80+
try await pool.write { lease in
81+
do {
82+
try callback.execute(
83+
lease: KotlinLeaseAdapter(
84+
lease: lease
85+
)
86+
)
87+
} catch {
88+
errorToThrow = error
89+
}
90+
}
91+
if let errorToThrow {
92+
throw errorToThrow
4893
}
4994
} catch {
5095
try? PowerSyncKotlin.throwPowerSyncException(
@@ -56,11 +101,16 @@ final class SwiftSQLiteConnectionPoolAdapter: PowerSyncKotlin.SwiftPoolAdapter {
56101
}
57102
}
58103

59-
func __leaseAll(callback: @escaping (Any, [Any]) -> Void) async throws {
104+
func __leaseAll(callback: any AllLeaseCallback) async throws {
60105
// TODO, actually use all connections
61106
do {
62-
try await pool.write { pointer in
63-
callback(UInt(bitPattern: pointer), [])
107+
try await pool.write { lease in
108+
try? callback.execute(
109+
writeLease: KotlinLeaseAdapter(
110+
lease: lease
111+
),
112+
readLeases: []
113+
)
64114
}
65115
} catch {
66116
try? PowerSyncKotlin.throwPowerSyncException(

Sources/PowerSync/Protocol/SQLiteConnectionPool.swift

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,29 @@
11
import Foundation
22

3+
public protocol SQLiteConnectionLease {
4+
var pointer: OpaquePointer { get }
5+
}
6+
37
/// An implementation of a connection pool providing asynchronous access to a single writer and multiple readers.
48
/// This is the underlying pool implementation on which the higher-level PowerSync Swift SDK is built on.
59
public protocol SQLiteConnectionPoolProtocol {
6-
func getPendingUpdates() -> Set<String>
10+
var tableUpdates: AsyncStream<Set<String>> { get }
711

812
/// Calls the callback with a read-only connection temporarily leased from the pool.
913
func read(
10-
onConnection: @Sendable @escaping (OpaquePointer) -> Void,
14+
onConnection: @Sendable @escaping (SQLiteConnectionLease) -> Void,
1115
) async throws
1216

1317
/// Calls the callback with a read-write connection temporarily leased from the pool.
1418
func write(
15-
onConnection: @Sendable @escaping (OpaquePointer) -> Void,
19+
onConnection: @Sendable @escaping (SQLiteConnectionLease) -> Void,
1620
) async throws
1721

1822
/// Invokes the callback with all connections leased from the pool.
1923
func withAllConnections(
2024
onConnection: @Sendable @escaping (
21-
_ writer: OpaquePointer,
22-
_ readers: [OpaquePointer]
25+
_ writer: SQLiteConnectionLease,
26+
_ readers: [SQLiteConnectionLease]
2327
) -> Void,
2428
) async throws
2529

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import Foundation
2+
import GRDB
3+
import PowerSync
4+
import SQLite3
5+
6+
/// Extension for GRDB `Configuration` to add PowerSync support.
7+
///
8+
/// Call `configurePowerSync(schema:)` on your existing GRDB `Configuration` to:
9+
/// - Register the PowerSync SQLite core extension (required for PowerSync features).
10+
/// - Add PowerSync schema views to your database schema source.
11+
///
12+
/// This enables PowerSync replication and view management in your GRDB database.
13+
///
14+
/// Example usage:
15+
/// ```swift
16+
/// var config = Configuration()
17+
/// config.configurePowerSync(schema: mySchema)
18+
/// let dbQueue = try DatabaseQueue(path: dbPath, configuration: config)
19+
/// ```
20+
///
21+
/// - Parameter schema: The PowerSync `Schema` describing your sync views.
22+
public extension Configuration {
23+
mutating func configurePowerSync(
24+
schema: Schema
25+
) {
26+
// Register the PowerSync core extension
27+
prepareDatabase { database in
28+
guard let bundle = Bundle(identifier: "co.powersync.sqlitecore") else {
29+
throw PowerSyncGRDBError.coreBundleNotFound
30+
}
31+
32+
// Construct the full path to the shared library inside the bundle
33+
let fullPath = bundle.bundlePath + "/powersync-sqlite-core"
34+
35+
let extensionLoadResult = sqlite3_enable_load_extension(database.sqliteConnection, 1)
36+
if extensionLoadResult != SQLITE_OK {
37+
throw PowerSyncGRDBError.extensionLoadFailed("Could not enable extension loading")
38+
}
39+
var errorMsg: UnsafeMutablePointer<Int8>?
40+
let loadResult = sqlite3_load_extension(database.sqliteConnection, fullPath, "sqlite3_powersync_init", &errorMsg)
41+
if loadResult != SQLITE_OK {
42+
if let errorMsg = errorMsg {
43+
let message = String(cString: errorMsg)
44+
sqlite3_free(errorMsg)
45+
throw PowerSyncGRDBError.extensionLoadFailed(message)
46+
} else {
47+
throw PowerSyncGRDBError.unknownExtensionLoadError
48+
}
49+
}
50+
}
51+
52+
// Supply the PowerSync views as a SchemaSource
53+
let powerSyncSchemaSource = PowerSyncSchemaSource(
54+
schema: schema
55+
)
56+
if let schemaSource = schemaSource {
57+
self.schemaSource = schemaSource.then(powerSyncSchemaSource)
58+
} else {
59+
schemaSource = powerSyncSchemaSource
60+
}
61+
}
62+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import GRDB
2+
import PowerSync
3+
4+
/// A schema source used by GRDB to resolve primary keys for PowerSync views.
5+
///
6+
/// This struct allows GRDB to identify the primary key columns for tables/views
7+
/// defined in the PowerSync schema, enabling correct integration with GRDB's
8+
/// database observation and record management features.
9+
struct PowerSyncSchemaSource: DatabaseSchemaSource {
10+
let schema: Schema
11+
12+
func columnsForPrimaryKey(_: Database, inView view: DatabaseObjectID) throws -> [String]? {
13+
if schema.tables.first(where: { table in
14+
table.viewName == view.name
15+
}) != nil {
16+
return ["id"]
17+
}
18+
return nil
19+
}
20+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import Foundation
2+
import GRDB
3+
import PowerSync
4+
5+
/// Internal lease object that exposes the raw GRDB SQLite connection pointer.
6+
///
7+
/// This is used to bridge GRDB's managed database connection with the Kotlin SDK,
8+
/// allowing direct access to the underlying SQLite connection for PowerSync operations.
9+
final class GRDBConnectionLease: SQLiteConnectionLease {
10+
var pointer: OpaquePointer
11+
12+
init(database: Database) throws {
13+
guard let connection = database.sqliteConnection else {
14+
throw PowerSyncGRDBError.connectionUnavailable
15+
}
16+
pointer = connection
17+
}
18+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
import Foundation
2+
import GRDB
3+
import PowerSync
4+
import SQLite3
5+
6+
/// Adapts a GRDB `DatabasePool` for use with the PowerSync SDK.
7+
///
8+
/// This class implements `SQLiteConnectionPoolProtocol` and provides
9+
/// integration between GRDB's connection pool and PowerSync's requirements,
10+
/// including table update observation and direct access to SQLite connections.
11+
///
12+
/// - Provides async streams of table updates for replication.
13+
/// - Bridges GRDB's managed connections to PowerSync's lease abstraction.
14+
/// - Allows both read and write access to raw SQLite connections.
15+
public final class GRDBConnectionPool: SQLiteConnectionPoolProtocol {
16+
let pool: DatabasePool
17+
18+
public private(set) var tableUpdates: AsyncStream<Set<String>>
19+
private var tableUpdatesContinuation: AsyncStream<Set<String>>.Continuation?
20+
21+
public init(
22+
pool: DatabasePool
23+
) {
24+
self.pool = pool
25+
// Cannot capture Self before initializing all properties
26+
var tempContinuation: AsyncStream<Set<String>>.Continuation?
27+
tableUpdates = AsyncStream { continuation in
28+
tempContinuation = continuation
29+
pool.add(
30+
transactionObserver: PowerSyncTransactionObserver { updates in
31+
// push the update
32+
continuation.yield(updates)
33+
},
34+
extent: .databaseLifetime
35+
)
36+
}
37+
tableUpdatesContinuation = tempContinuation
38+
}
39+
40+
public func read(
41+
onConnection: @Sendable @escaping (SQLiteConnectionLease) -> Void
42+
) async throws {
43+
try await pool.read { database in
44+
try onConnection(
45+
GRDBConnectionLease(database: database)
46+
)
47+
}
48+
}
49+
50+
public func write(
51+
onConnection: @Sendable @escaping (SQLiteConnectionLease) -> Void
52+
) async throws {
53+
// Don't start an explicit transaction, we do this internally
54+
let updateBroker = UpdateBroker()
55+
try await pool.writeWithoutTransaction { database in
56+
57+
let brokerPointer = Unmanaged.passUnretained(updateBroker).toOpaque()
58+
59+
/// GRDB only registers an update hook if it detects a requirement for one.
60+
/// It also removes its own update hook if no longer needed.
61+
/// We use the SQLite connection pointer directly, which sidesteps GRDB.
62+
/// We can register our own temporary update hook here.
63+
let previousParamPointer = sqlite3_update_hook(
64+
database.sqliteConnection,
65+
{ brokerPointer, _, _, tableNameCString, _ in
66+
let broker = Unmanaged<UpdateBroker>.fromOpaque(brokerPointer!).takeUnretainedValue()
67+
broker.updates.insert(String(cString: tableNameCString!))
68+
},
69+
brokerPointer
70+
)
71+
72+
// This should not be present
73+
assert(previousParamPointer == nil, "A pre-existing update hook was already registered and has been overwritten.")
74+
75+
defer {
76+
// Deregister our temporary hook
77+
sqlite3_update_hook(database.sqliteConnection, nil, nil)
78+
}
79+
80+
try onConnection(
81+
GRDBConnectionLease(database: database)
82+
)
83+
}
84+
85+
// Notify GRDB consumers of updates
86+
// Seems like we need to do this in a write transaction
87+
try await pool.write { database in
88+
for table in updateBroker.updates {
89+
try database.notifyChanges(in: Table(table))
90+
if table.hasPrefix("ps_data__") {
91+
let stripped = String(table.dropFirst("ps_data__".count))
92+
try database.notifyChanges(in: Table(stripped))
93+
} else if table.hasPrefix("ps_data_local__") {
94+
let stripped = String(table.dropFirst("ps_data_local__".count))
95+
try database.notifyChanges(in: Table(stripped))
96+
}
97+
}
98+
}
99+
guard let pushUpdates = tableUpdatesContinuation else {
100+
return
101+
}
102+
// Notify the PowerSync SDK consumers of updates
103+
pushUpdates.yield(updateBroker.updates)
104+
}
105+
106+
public func withAllConnections(
107+
onConnection _: @escaping (SQLiteConnectionLease, [SQLiteConnectionLease]) -> Void
108+
) async throws {
109+
// TODO:
110+
}
111+
112+
public func close() throws {
113+
try pool.close()
114+
}
115+
}

Sources/PowerSyncGRDB/Errors.swift

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/// Errors thrown by the PowerSyncGRDB integration layer.
2+
///
3+
/// These errors represent issues encountered when bridging GRDB and PowerSync,
4+
/// such as missing extensions, failed extension loads, or unavailable connections.
5+
public enum PowerSyncGRDBError: Error {
6+
/// The PowerSync SQLite core bundle could not be found.
7+
case coreBundleNotFound
8+
9+
/// Failed to load the PowerSync SQLite extension, with an associated error message.
10+
case extensionLoadFailed(String)
11+
12+
/// An unknown error occurred while loading the PowerSync SQLite extension.
13+
case unknownExtensionLoadError
14+
15+
/// The underlying SQLite connection could not be obtained from GRDB.
16+
case connectionUnavailable
17+
}

0 commit comments

Comments
 (0)