Skip to content

Commit e6960da

Browse files
wip: indexeddb holds
1 parent d88fc5f commit e6960da

File tree

8 files changed

+247
-65
lines changed

8 files changed

+247
-65
lines changed

packages/web/src/db/adapters/AsyncDatabaseConnection.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ export type OnTableChangeCallback = (event: BatchedUpdateNotification) => void;
2525
export interface AsyncDatabaseConnection<Config extends ResolvedWebSQLOpenOptions = ResolvedWebSQLOpenOptions> {
2626
init(): Promise<void>;
2727
close(): Promise<void>;
28+
markHold(): Promise<string>;
29+
releaseHold(holdId: string): Promise<void>;
2830
execute(sql: string, params?: any[]): Promise<ProxiedQueryResult>;
2931
executeRaw(sql: string, params?: any[]): Promise<any[][]>;
3032
executeBatch(sql: string, params?: any[]): Promise<ProxiedQueryResult>;

packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
import {
2-
type ILogger,
32
BaseObserver,
4-
createLogger,
53
DBAdapter,
64
DBAdapterListener,
75
DBGetUtils,
86
DBLockOptions,
97
LockContext,
108
QueryResult,
11-
Transaction
9+
Transaction,
10+
createLogger,
11+
type ILogger
1212
} from '@powersync/common';
1313
import { getNavigatorLocks } from '../..//shared/navigator';
1414
import { AsyncDatabaseConnection } from './AsyncDatabaseConnection';
@@ -125,7 +125,7 @@ export class LockedAsyncDatabaseAdapter
125125
if (false == this._db instanceof WorkerWrappedAsyncDatabaseConnection) {
126126
throw new Error(`Only worker connections can be shared`);
127127
}
128-
return this._db.shareConnection();
128+
return (this._db as WorkerWrappedAsyncDatabaseConnection).shareConnection();
129129
}
130130

131131
/**
@@ -221,13 +221,22 @@ export class LockedAsyncDatabaseAdapter
221221
}, timeoutMs)
222222
: null;
223223

224-
return getNavigatorLocks().request(`db-lock-${this._dbIdentifier}`, { signal: abortController.signal }, () => {
225-
this.pendingAbortControllers.delete(abortController);
226-
if (timoutId) {
227-
clearTimeout(timoutId);
224+
return getNavigatorLocks().request(
225+
`db-lock-${this._dbIdentifier}`,
226+
{ signal: abortController.signal },
227+
async () => {
228+
this.pendingAbortControllers.delete(abortController);
229+
if (timoutId) {
230+
clearTimeout(timoutId);
231+
}
232+
const holdId = await this.baseDB.markHold();
233+
try {
234+
return await callback();
235+
} finally {
236+
await this.baseDB.releaseHold(holdId);
237+
}
228238
}
229-
return callback();
230-
});
239+
);
231240
}
232241

233242
async readTransaction<T>(fn: (tx: Transaction) => Promise<T>, options?: DBLockOptions | undefined): Promise<T> {

packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,14 @@ export class WorkerWrappedAsyncDatabaseConnection<Config extends ResolvedWebSQLO
6060
this.notifyRemoteClosed!.abort();
6161
}
6262

63+
markHold(): Promise<string> {
64+
return this.baseConnection.markHold();
65+
}
66+
67+
releaseHold(holdId: string): Promise<void> {
68+
return this.baseConnection.releaseHold(holdId);
69+
}
70+
6371
private withRemote<T>(workerPromise: () => Promise<T>): Promise<T> {
6472
const controller = this.notifyRemoteClosed;
6573
if (controller) {

packages/web/src/db/adapters/wa-sqlite/WASQLiteConnection.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,14 @@ export type WASQLiteBroadCastTableUpdateEvent = {
2626
*/
2727
export type WASQLiteConnectionListener = {
2828
tablesUpdated: (event: BatchedUpdateNotification) => void;
29+
/**
30+
* Triggered when an active hold is overwritten by a new hold.
31+
* This is most likely to happen when a shared connection has been closed
32+
* without releasing the hold.
33+
* This listener can be used to cleanup any resources associated with the previous hold.
34+
* @param holdId - The id of the hold that has been overwritten.
35+
*/
36+
holdOverwritten: (holdId: string) => Promise<void>;
2937
};
3038

3139
/**
@@ -148,6 +156,9 @@ export class WASqliteConnection
148156
*/
149157
protected connectionId: number;
150158

159+
protected _holdCounter: number;
160+
protected _holdId: string | null;
161+
151162
constructor(protected options: ResolvedWASQLiteOpenFactoryOptions) {
152163
super();
153164
this.updatedTables = new Set();
@@ -156,6 +167,16 @@ export class WASqliteConnection
156167
this.connectionId = new Date().valueOf() + Math.random();
157168
this.statementMutex = new Mutex();
158169
this._moduleFactory = DEFAULT_MODULE_FACTORIES[this.options.vfs];
170+
this._holdCounter = 0;
171+
this._holdId = null;
172+
}
173+
174+
/**
175+
* Gets the id for the current hold.
176+
* This can be used to check for invalid states.
177+
*/
178+
get currentHoldId() {
179+
return this._holdId;
159180
}
160181

161182
protected get sqliteAPI() {
@@ -172,6 +193,22 @@ export class WASqliteConnection
172193
return this._dbP;
173194
}
174195

196+
async markHold(): Promise<string> {
197+
const previousHoldId = this._holdId;
198+
this._holdId = `${++this._holdCounter}`;
199+
if (previousHoldId) {
200+
await this.iterateAsyncListeners(async (cb) => cb.holdOverwritten?.(previousHoldId));
201+
}
202+
return this._holdId;
203+
}
204+
205+
async releaseHold(holdId: string): Promise<void> {
206+
if (holdId != this._holdId) {
207+
throw new Error(`Invalid hold state, expected ${this._holdId} but got ${holdId}`);
208+
}
209+
this._holdId = null;
210+
}
211+
175212
protected async openDB() {
176213
this._dbP = await this.sqliteAPI.open_v2(this.options.dbFilename);
177214
return this._dbP;
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
import { ILogger } from '@powersync/common';
2+
import {
3+
AsyncDatabaseConnection,
4+
OnTableChangeCallback,
5+
ProxiedQueryResult
6+
} from '../../db/adapters/AsyncDatabaseConnection';
7+
import { ResolvedWebSQLOpenOptions } from '../../db/adapters/web-sql-flags';
8+
9+
/**
10+
* Keeps track of open DB connections and the clients which
11+
* are using it.
12+
*/
13+
export type SharedDBWorkerConnection = {
14+
clientIds: Set<number>;
15+
db: AsyncDatabaseConnection;
16+
};
17+
18+
export type SharedWASQLiteConnectionOptions = {
19+
dbMap: Map<string, SharedDBWorkerConnection>;
20+
dbFilename: string;
21+
clientId: number;
22+
logger: ILogger;
23+
};
24+
25+
export class SharedWASQLiteConnection implements AsyncDatabaseConnection {
26+
protected isClosing: boolean;
27+
// Keeps track if this current hold if the shared connection has a hold
28+
protected activeHoldId: string | null;
29+
30+
constructor(protected options: SharedWASQLiteConnectionOptions) {
31+
// Add this client ID to the set of known clients
32+
this.clientIds.add(options.clientId);
33+
this.isClosing = false;
34+
this.activeHoldId = null;
35+
}
36+
37+
async init(): Promise<void> {
38+
// No-op since the connection is already initialized when it was created
39+
}
40+
41+
async markHold(): Promise<string> {
42+
this.activeHoldId = await this.connection.markHold();
43+
return this.activeHoldId;
44+
}
45+
46+
async releaseHold(id: string): Promise<void> {
47+
try {
48+
await this.connection.releaseHold(id);
49+
} finally {
50+
this.activeHoldId = null;
51+
}
52+
}
53+
54+
protected get logger() {
55+
return this.options.logger;
56+
}
57+
58+
protected get dbEntry() {
59+
return this.options.dbMap.get(this.options.dbFilename)!;
60+
}
61+
62+
protected get connection() {
63+
return this.dbEntry.db;
64+
}
65+
66+
protected get clientIds() {
67+
return this.dbEntry.clientIds;
68+
}
69+
70+
/**
71+
* Handles closing of a shared connection.
72+
* The connection is only closed if there are no active clients using it.
73+
*/
74+
async close(): Promise<void> {
75+
// This prevents further statements on this connection from being executed
76+
this.isClosing = true;
77+
const { clientIds, logger } = this;
78+
const { clientId, dbFilename, dbMap } = this.options;
79+
logger.debug(`Close requested from client ${clientId} of ${[...clientIds]}`);
80+
clientIds.delete(clientId);
81+
82+
if (this.activeHoldId) {
83+
/**
84+
* The hold hasn't been released, but we're closing now.
85+
* We can proactively cleanup and release the hold.
86+
*/
87+
await this.connection.execute('ROLLBACK').catch(() => {});
88+
await this.connection.releaseHold(this.activeHoldId).catch(() => {});
89+
}
90+
91+
if (clientIds.size == 0) {
92+
logger.debug(`Closing connection to ${this.options}.`);
93+
dbMap.delete(dbFilename);
94+
return this.connection.close();
95+
}
96+
logger.debug(`Connection to ${dbFilename} not closed yet due to active clients.`);
97+
return;
98+
}
99+
100+
protected async withClosing<T>(action: () => Promise<T>) {
101+
if (this.isClosing) {
102+
throw new Error('Connection is closing');
103+
}
104+
return action();
105+
}
106+
107+
async execute(sql: string, params?: any[]): Promise<ProxiedQueryResult> {
108+
return this.withClosing(() => this.connection.execute(sql, params));
109+
}
110+
111+
async executeRaw(sql: string, params?: any[]): Promise<any[][]> {
112+
return this.withClosing(() => this.connection.executeRaw(sql, params));
113+
}
114+
115+
executeBatch(sql: string, params?: any[] | undefined): Promise<ProxiedQueryResult> {
116+
return this.withClosing(() => this.connection.executeBatch(sql, params));
117+
}
118+
119+
registerOnTableChange(callback: OnTableChangeCallback): Promise<() => void> {
120+
return this.connection.registerOnTableChange(callback);
121+
}
122+
123+
getConfig(): Promise<ResolvedWebSQLOpenOptions> {
124+
return this.connection.getConfig();
125+
}
126+
}

packages/web/src/worker/db/WASQLiteDB.worker.ts

Lines changed: 25 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -6,47 +6,20 @@ import '@journeyapps/wa-sqlite';
66
import { createBaseLogger, createLogger } from '@powersync/common';
77
import * as Comlink from 'comlink';
88
import { AsyncDatabaseConnection } from '../../db/adapters/AsyncDatabaseConnection';
9-
import { WASqliteConnection } from '../../db/adapters/wa-sqlite/WASQLiteConnection';
10-
import {
11-
ResolvedWASQLiteOpenFactoryOptions,
12-
WorkerDBOpenerOptions
13-
} from '../../db/adapters/wa-sqlite/WASQLiteOpenFactory';
9+
import { WorkerDBOpenerOptions } from '../../db/adapters/wa-sqlite/WASQLiteOpenFactory';
1410
import { getNavigatorLocks } from '../../shared/navigator';
11+
import { SharedDBWorkerConnection, SharedWASQLiteConnection } from './SharedWASQLiteConnection';
12+
import { WorkerWASQLiteConnection, proxyWASQLiteConnection } from './WorkerWASQLiteConnection';
1513

1614
const baseLogger = createBaseLogger();
1715
baseLogger.useDefaults();
1816
const logger = createLogger('db-worker');
1917

20-
/**
21-
* Keeps track of open DB connections and the clients which
22-
* are using it.
23-
*/
24-
type SharedDBWorkerConnection = {
25-
clientIds: Set<number>;
26-
db: AsyncDatabaseConnection;
27-
};
28-
2918
const DBMap = new Map<string, SharedDBWorkerConnection>();
3019
const OPEN_DB_LOCK = 'open-wasqlite-db';
3120

3221
let nextClientId = 1;
3322

34-
const openWorkerConnection = async (options: ResolvedWASQLiteOpenFactoryOptions): Promise<AsyncDatabaseConnection> => {
35-
const connection = new WASqliteConnection(options);
36-
return {
37-
init: Comlink.proxy(() => connection.init()),
38-
getConfig: Comlink.proxy(() => connection.getConfig()),
39-
close: Comlink.proxy(() => connection.close()),
40-
execute: Comlink.proxy(async (sql: string, params?: any[]) => connection.execute(sql, params)),
41-
executeRaw: Comlink.proxy(async (sql: string, params?: any[]) => connection.executeRaw(sql, params)),
42-
executeBatch: Comlink.proxy(async (sql: string, params?: any[]) => connection.executeBatch(sql, params)),
43-
registerOnTableChange: Comlink.proxy(async (callback) => {
44-
// Proxy the callback remove function
45-
return Comlink.proxy(await connection.registerOnTableChange(callback));
46-
})
47-
};
48-
};
49-
5023
const openDBShared = async (options: WorkerDBOpenerOptions): Promise<AsyncDatabaseConnection> => {
5124
// Prevent multiple simultaneous opens from causing race conditions
5225
return getNavigatorLocks().request(OPEN_DB_LOCK, async () => {
@@ -57,38 +30,36 @@ const openDBShared = async (options: WorkerDBOpenerOptions): Promise<AsyncDataba
5730

5831
if (!DBMap.has(dbFilename)) {
5932
const clientIds = new Set<number>();
60-
const connection = await openWorkerConnection(options);
33+
// This format returns proxy objects for function callbacks
34+
const connection = new WorkerWASQLiteConnection(options);
6135
await connection.init();
36+
37+
connection.registerListener({
38+
holdOverwritten: async () => {
39+
/**
40+
* The previous hold has been overwritten, without being released.
41+
* we need to cleanup any resources associated with it.
42+
* We can perform a rollback to release any potential transactions that were started.
43+
*/
44+
await connection.execute('ROLLBACK').catch(() => {});
45+
}
46+
});
47+
6248
DBMap.set(dbFilename, {
6349
clientIds,
6450
db: connection
6551
});
6652
}
6753

68-
const dbEntry = DBMap.get(dbFilename)!;
69-
dbEntry.clientIds.add(clientId);
70-
const { db } = dbEntry;
71-
72-
const wrappedConnection = {
73-
...db,
74-
init: Comlink.proxy(async () => {
75-
// the init has been done automatically
76-
}),
77-
close: Comlink.proxy(async () => {
78-
const { clientIds } = dbEntry;
79-
logger.debug(`Close requested from client ${clientId} of ${[...clientIds]}`);
80-
clientIds.delete(clientId);
81-
if (clientIds.size == 0) {
82-
logger.debug(`Closing connection to ${dbFilename}.`);
83-
DBMap.delete(dbFilename);
84-
return db.close?.();
85-
}
86-
logger.debug(`Connection to ${dbFilename} not closed yet due to active clients.`);
87-
return;
88-
})
89-
};
54+
// Associates this clientId with the shared connection entry
55+
const sharedConnection = new SharedWASQLiteConnection({
56+
dbMap: DBMap,
57+
dbFilename,
58+
clientId,
59+
logger
60+
});
9061

91-
return Comlink.proxy(wrappedConnection);
62+
return proxyWASQLiteConnection(sharedConnection);
9263
});
9364
};
9465

0 commit comments

Comments
 (0)