Skip to content

Commit e88154e

Browse files
fix: Potential Multitab Transaction Issues With IndexedDB (#775)
1 parent d88fc5f commit e88154e

File tree

11 files changed

+405
-80
lines changed

11 files changed

+405
-80
lines changed

.changeset/silver-insects-unite.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@powersync/web': patch
3+
---
4+
5+
- Fixed an issue where IndexedDB could cause "cannot start a transaction within a transaction" errors.
6+
- Improved reconnect logic when multiple tabs are closed.

packages/node/tests/utils.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,8 @@ export const mockSyncServiceTest = tempDirectoryTest.extend<{
150150

151151
const newConnection = async (options?: Partial<NodePowerSyncDatabaseOptions>) => {
152152
const db = await createDatabase(tmpdir, {
153+
// This might help with test stability/timeouts if a retry is needed
154+
retryDelayMs: 100,
153155
...options,
154156
database: {
155157
dbFilename: databaseName,

packages/web/src/db/adapters/AsyncDatabaseConnection.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,21 @@ export type OnTableChangeCallback = (event: BatchedUpdateNotification) => void;
2525
export interface AsyncDatabaseConnection<Config extends ResolvedWebSQLOpenOptions = ResolvedWebSQLOpenOptions> {
2626
init(): Promise<void>;
2727
close(): Promise<void>;
28+
/**
29+
* Marks the connection as in-use by a certain actor.
30+
* @returns A hold ID which can be used to release the hold.
31+
*/
32+
markHold(): Promise<string>;
33+
/**
34+
* Releases a hold on the connection.
35+
* @param holdId The hold ID to release.
36+
*/
37+
releaseHold(holdId: string): Promise<void>;
38+
/**
39+
* Checks if the database connection is in autocommit mode.
40+
* @returns true if in autocommit mode, false if in a transaction
41+
*/
42+
isAutoCommit(): Promise<boolean>;
2843
execute(sql: string, params?: any[]): Promise<ProxiedQueryResult>;
2944
executeRaw(sql: string, params?: any[]): Promise<any[][]>;
3045
executeBatch(sql: string, params?: any[]): Promise<ProxiedQueryResult>;

packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,21 @@
11
import {
2-
type ILogger,
32
BaseObserver,
4-
createLogger,
53
DBAdapter,
64
DBAdapterListener,
75
DBGetUtils,
86
DBLockOptions,
97
LockContext,
108
QueryResult,
11-
Transaction
9+
Transaction,
10+
createLogger,
11+
type ILogger
1212
} from '@powersync/common';
1313
import { getNavigatorLocks } from '../..//shared/navigator';
1414
import { AsyncDatabaseConnection } from './AsyncDatabaseConnection';
1515
import { SharedConnectionWorker, WebDBAdapter } from './WebDBAdapter';
1616
import { WorkerWrappedAsyncDatabaseConnection } from './WorkerWrappedAsyncDatabaseConnection';
17+
import { WASQLiteVFS } from './wa-sqlite/WASQLiteConnection';
18+
import { ResolvedWASQLiteOpenFactoryOptions } from './wa-sqlite/WASQLiteOpenFactory';
1719
import { ResolvedWebSQLOpenOptions } from './web-sql-flags';
1820

1921
/**
@@ -48,6 +50,7 @@ export class LockedAsyncDatabaseAdapter
4850
protected _disposeTableChangeListener: (() => void) | null = null;
4951
private _config: ResolvedWebSQLOpenOptions | null = null;
5052
protected pendingAbortControllers: Set<AbortController>;
53+
protected requiresHolds: boolean | null;
5154

5255
closing: boolean;
5356
closed: boolean;
@@ -59,6 +62,7 @@ export class LockedAsyncDatabaseAdapter
5962
this.pendingAbortControllers = new Set<AbortController>();
6063
this.closed = false;
6164
this.closing = false;
65+
this.requiresHolds = null;
6266
// Set the name if provided. We can query for the name if not available yet
6367
this.debugMode = options.debugMode ?? false;
6468
if (this.debugMode) {
@@ -107,6 +111,10 @@ export class LockedAsyncDatabaseAdapter
107111
this._config = await this._db.getConfig();
108112
await this.registerOnChangeListener(this._db);
109113
this.iterateListeners((cb) => cb.initialized?.());
114+
/**
115+
* This is only required for the long-lived shared IndexedDB connections.
116+
*/
117+
this.requiresHolds = (this._config as ResolvedWASQLiteOpenFactoryOptions).vfs == WASQLiteVFS.IDBBatchAtomicVFS;
110118
}
111119

112120
getConfiguration(): ResolvedWebSQLOpenOptions {
@@ -125,7 +133,7 @@ export class LockedAsyncDatabaseAdapter
125133
if (false == this._db instanceof WorkerWrappedAsyncDatabaseConnection) {
126134
throw new Error(`Only worker connections can be shared`);
127135
}
128-
return this._db.shareConnection();
136+
return (this._db as WorkerWrappedAsyncDatabaseConnection).shareConnection();
129137
}
130138

131139
/**
@@ -221,13 +229,24 @@ export class LockedAsyncDatabaseAdapter
221229
}, timeoutMs)
222230
: null;
223231

224-
return getNavigatorLocks().request(`db-lock-${this._dbIdentifier}`, { signal: abortController.signal }, () => {
225-
this.pendingAbortControllers.delete(abortController);
226-
if (timoutId) {
227-
clearTimeout(timoutId);
232+
return getNavigatorLocks().request(
233+
`db-lock-${this._dbIdentifier}`,
234+
{ signal: abortController.signal },
235+
async () => {
236+
this.pendingAbortControllers.delete(abortController);
237+
if (timoutId) {
238+
clearTimeout(timoutId);
239+
}
240+
const holdId = this.requiresHolds ? await this.baseDB.markHold() : null;
241+
try {
242+
return await callback();
243+
} finally {
244+
if (holdId) {
245+
await this.baseDB.releaseHold(holdId);
246+
}
247+
}
228248
}
229-
return callback();
230-
});
249+
);
231250
}
232251

233252
async readTransaction<T>(fn: (tx: Transaction) => Promise<T>, options?: DBLockOptions | undefined): Promise<T> {

packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,26 @@ export class WorkerWrappedAsyncDatabaseConnection<Config extends ResolvedWebSQLO
6060
this.notifyRemoteClosed!.abort();
6161
}
6262

63+
markHold(): Promise<string> {
64+
return this.withRemote(() => this.baseConnection.markHold());
65+
}
66+
67+
releaseHold(holdId: string): Promise<void> {
68+
return this.withRemote(() => this.baseConnection.releaseHold(holdId));
69+
}
70+
71+
isAutoCommit(): Promise<boolean> {
72+
return this.withRemote(() => this.baseConnection.isAutoCommit());
73+
}
74+
6375
private withRemote<T>(workerPromise: () => Promise<T>): Promise<T> {
6476
const controller = this.notifyRemoteClosed;
6577
if (controller) {
6678
return new Promise((resolve, reject) => {
6779
if (controller.signal.aborted) {
6880
reject(new Error('Called operation on closed remote'));
81+
// Don't run the operation if we're going to reject
82+
return;
6983
}
7084

7185
function handleAbort() {

packages/web/src/db/adapters/wa-sqlite/WASQLiteConnection.ts

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,14 @@ export type WASQLiteBroadCastTableUpdateEvent = {
2626
*/
2727
export type WASQLiteConnectionListener = {
2828
tablesUpdated: (event: BatchedUpdateNotification) => void;
29+
/**
30+
* Triggered when an active hold is overwritten by a new hold.
31+
* This is most likely to happen when a shared connection has been closed
32+
* without releasing the hold.
33+
* This listener can be used to cleanup any resources associated with the previous hold.
34+
* @param holdId - The id of the hold that has been overwritten.
35+
*/
36+
holdOverwritten: (holdId: string) => Promise<void>;
2937
};
3038

3139
/**
@@ -148,6 +156,9 @@ export class WASqliteConnection
148156
*/
149157
protected connectionId: number;
150158

159+
protected _holdCounter: number;
160+
protected _holdId: string | null;
161+
151162
constructor(protected options: ResolvedWASQLiteOpenFactoryOptions) {
152163
super();
153164
this.updatedTables = new Set();
@@ -156,6 +167,16 @@ export class WASqliteConnection
156167
this.connectionId = new Date().valueOf() + Math.random();
157168
this.statementMutex = new Mutex();
158169
this._moduleFactory = DEFAULT_MODULE_FACTORIES[this.options.vfs];
170+
this._holdCounter = 0;
171+
this._holdId = null;
172+
}
173+
174+
/**
175+
* Gets the id for the current hold.
176+
* This can be used to check for invalid states.
177+
*/
178+
get currentHoldId() {
179+
return this._holdId;
159180
}
160181

161182
protected get sqliteAPI() {
@@ -172,6 +193,30 @@ export class WASqliteConnection
172193
return this._dbP;
173194
}
174195

196+
/**
197+
* Checks if the database connection is in autocommit mode.
198+
* @returns true if in autocommit mode, false if in a transaction
199+
*/
200+
async isAutoCommit(): Promise<boolean> {
201+
return this.sqliteAPI.get_autocommit(this.dbP) != 0;
202+
}
203+
204+
async markHold(): Promise<string> {
205+
const previousHoldId = this._holdId;
206+
this._holdId = `${++this._holdCounter}`;
207+
if (previousHoldId) {
208+
await this.iterateAsyncListeners(async (cb) => cb.holdOverwritten?.(previousHoldId));
209+
}
210+
return this._holdId;
211+
}
212+
213+
async releaseHold(holdId: string): Promise<void> {
214+
if (holdId != this._holdId) {
215+
throw new Error(`Invalid hold state, expected ${this._holdId} but got ${holdId}`);
216+
}
217+
this._holdId = null;
218+
}
219+
175220
protected async openDB() {
176221
this._dbP = await this.sqliteAPI.open_v2(this.options.dbFilename);
177222
return this._dbP;
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
import { ILogger } from '@powersync/common';
2+
import {
3+
AsyncDatabaseConnection,
4+
OnTableChangeCallback,
5+
ProxiedQueryResult
6+
} from '../../db/adapters/AsyncDatabaseConnection';
7+
import { ResolvedWebSQLOpenOptions } from '../../db/adapters/web-sql-flags';
8+
9+
/**
10+
* Keeps track of open DB connections and the clients which
11+
* are using it.
12+
*/
13+
export type SharedDBWorkerConnection = {
14+
clientIds: Set<number>;
15+
db: AsyncDatabaseConnection;
16+
};
17+
18+
export type SharedWASQLiteConnectionOptions = {
19+
dbMap: Map<string, SharedDBWorkerConnection>;
20+
dbFilename: string;
21+
clientId: number;
22+
logger: ILogger;
23+
};
24+
25+
export class SharedWASQLiteConnection implements AsyncDatabaseConnection {
26+
protected isClosing: boolean;
27+
// Keeps track if this current hold if the shared connection has a hold
28+
protected activeHoldId: string | null;
29+
30+
constructor(protected options: SharedWASQLiteConnectionOptions) {
31+
// Add this client ID to the set of known clients
32+
this.clientIds.add(options.clientId);
33+
this.isClosing = false;
34+
this.activeHoldId = null;
35+
}
36+
37+
protected get logger() {
38+
return this.options.logger;
39+
}
40+
41+
protected get dbEntry() {
42+
return this.options.dbMap.get(this.options.dbFilename)!;
43+
}
44+
45+
protected get connection() {
46+
return this.dbEntry.db;
47+
}
48+
49+
protected get clientIds() {
50+
return this.dbEntry.clientIds;
51+
}
52+
53+
async init(): Promise<void> {
54+
// No-op since the connection is already initialized when it was created
55+
}
56+
57+
async markHold(): Promise<string> {
58+
this.activeHoldId = await this.connection.markHold();
59+
return this.activeHoldId;
60+
}
61+
62+
async releaseHold(id: string): Promise<void> {
63+
try {
64+
await this.connection.releaseHold(id);
65+
} finally {
66+
this.activeHoldId = null;
67+
}
68+
}
69+
70+
async isAutoCommit(): Promise<boolean> {
71+
return this.connection.isAutoCommit();
72+
}
73+
74+
/**
75+
* Handles closing of a shared connection.
76+
* The connection is only closed if there are no active clients using it.
77+
*/
78+
async close(): Promise<void> {
79+
// This prevents further statements on this connection from being executed
80+
this.isClosing = true;
81+
const { clientIds, logger } = this;
82+
const { clientId, dbFilename, dbMap } = this.options;
83+
logger.debug(`Close requested from client ${clientId} of ${[...clientIds]}`);
84+
clientIds.delete(clientId);
85+
86+
if (this.activeHoldId) {
87+
// We can't cleanup here since we're not in a lock context.
88+
// The cleanup will occur once a new hold is acquired.
89+
this.logger.info(
90+
`Hold ${this.activeHoldId} was still active when the connection was closed. Cleanup will occur once a new hold is acquired.`
91+
);
92+
}
93+
94+
if (clientIds.size == 0) {
95+
logger.debug(`Closing connection to ${this.options}.`);
96+
const connection = this.connection;
97+
dbMap.delete(dbFilename);
98+
await connection.close();
99+
return;
100+
}
101+
logger.debug(`Connection to ${dbFilename} not closed yet due to active clients.`);
102+
return;
103+
}
104+
105+
protected async withClosing<T>(action: () => Promise<T>) {
106+
if (this.isClosing) {
107+
throw new Error('Connection is closing');
108+
}
109+
return action();
110+
}
111+
112+
async execute(sql: string, params?: any[]): Promise<ProxiedQueryResult> {
113+
return this.withClosing(() => this.connection.execute(sql, params));
114+
}
115+
116+
async executeRaw(sql: string, params?: any[]): Promise<any[][]> {
117+
return this.withClosing(() => this.connection.executeRaw(sql, params));
118+
}
119+
120+
executeBatch(sql: string, params?: any[] | undefined): Promise<ProxiedQueryResult> {
121+
return this.withClosing(() => this.connection.executeBatch(sql, params));
122+
}
123+
124+
registerOnTableChange(callback: OnTableChangeCallback): Promise<() => void> {
125+
return this.connection.registerOnTableChange(callback);
126+
}
127+
128+
getConfig(): Promise<ResolvedWebSQLOpenOptions> {
129+
return this.connection.getConfig();
130+
}
131+
}

0 commit comments

Comments
 (0)