Skip to content
6 changes: 6 additions & 0 deletions .changeset/silver-insects-unite.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@powersync/web': patch
---

- Fixed an issue where IndexedDB could cause "cannot start a transaction within a transaction" errors.
- Improved reconnect logic when multiple tabs are closed.
2 changes: 2 additions & 0 deletions packages/node/tests/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ export const mockSyncServiceTest = tempDirectoryTest.extend<{

const newConnection = async (options?: Partial<NodePowerSyncDatabaseOptions>) => {
const db = await createDatabase(tmpdir, {
// This might help with test stability/timeouts if a retry is needed
retryDelayMs: 100,
...options,
database: {
dbFilename: databaseName,
Expand Down
15 changes: 15 additions & 0 deletions packages/web/src/db/adapters/AsyncDatabaseConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,21 @@ export type OnTableChangeCallback = (event: BatchedUpdateNotification) => void;
export interface AsyncDatabaseConnection<Config extends ResolvedWebSQLOpenOptions = ResolvedWebSQLOpenOptions> {
init(): Promise<void>;
close(): Promise<void>;
/**
* Marks the connection as in-use by a certain actor.
* @returns A hold ID which can be used to release the hold.
*/
markHold(): Promise<string>;
/**
* Releases a hold on the connection.
* @param holdId The hold ID to release.
*/
releaseHold(holdId: string): Promise<void>;
/**
* Checks if the database connection is in autocommit mode.
* @returns true if in autocommit mode, false if in a transaction
*/
isAutoCommit(): Promise<boolean>;
execute(sql: string, params?: any[]): Promise<ProxiedQueryResult>;
executeRaw(sql: string, params?: any[]): Promise<any[][]>;
executeBatch(sql: string, params?: any[]): Promise<ProxiedQueryResult>;
Expand Down
39 changes: 29 additions & 10 deletions packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
import {
type ILogger,
BaseObserver,
createLogger,
DBAdapter,
DBAdapterListener,
DBGetUtils,
DBLockOptions,
LockContext,
QueryResult,
Transaction
Transaction,
createLogger,
type ILogger
} from '@powersync/common';
import { getNavigatorLocks } from '../..//shared/navigator';
import { AsyncDatabaseConnection } from './AsyncDatabaseConnection';
import { SharedConnectionWorker, WebDBAdapter } from './WebDBAdapter';
import { WorkerWrappedAsyncDatabaseConnection } from './WorkerWrappedAsyncDatabaseConnection';
import { WASQLiteVFS } from './wa-sqlite/WASQLiteConnection';
import { ResolvedWASQLiteOpenFactoryOptions } from './wa-sqlite/WASQLiteOpenFactory';
import { ResolvedWebSQLOpenOptions } from './web-sql-flags';

/**
Expand Down Expand Up @@ -48,6 +50,7 @@ export class LockedAsyncDatabaseAdapter
protected _disposeTableChangeListener: (() => void) | null = null;
private _config: ResolvedWebSQLOpenOptions | null = null;
protected pendingAbortControllers: Set<AbortController>;
protected requiresHolds: boolean | null;

closing: boolean;
closed: boolean;
Expand All @@ -59,6 +62,7 @@ export class LockedAsyncDatabaseAdapter
this.pendingAbortControllers = new Set<AbortController>();
this.closed = false;
this.closing = false;
this.requiresHolds = null;
// Set the name if provided. We can query for the name if not available yet
this.debugMode = options.debugMode ?? false;
if (this.debugMode) {
Expand Down Expand Up @@ -107,6 +111,10 @@ export class LockedAsyncDatabaseAdapter
this._config = await this._db.getConfig();
await this.registerOnChangeListener(this._db);
this.iterateListeners((cb) => cb.initialized?.());
/**
* This is only required for the long-lived shared IndexedDB connections.
*/
this.requiresHolds = (this._config as ResolvedWASQLiteOpenFactoryOptions).vfs == WASQLiteVFS.IDBBatchAtomicVFS;
}

getConfiguration(): ResolvedWebSQLOpenOptions {
Expand All @@ -125,7 +133,7 @@ export class LockedAsyncDatabaseAdapter
if (false == this._db instanceof WorkerWrappedAsyncDatabaseConnection) {
throw new Error(`Only worker connections can be shared`);
}
return this._db.shareConnection();
return (this._db as WorkerWrappedAsyncDatabaseConnection).shareConnection();
}

/**
Expand Down Expand Up @@ -221,13 +229,24 @@ export class LockedAsyncDatabaseAdapter
}, timeoutMs)
: null;

return getNavigatorLocks().request(`db-lock-${this._dbIdentifier}`, { signal: abortController.signal }, () => {
this.pendingAbortControllers.delete(abortController);
if (timoutId) {
clearTimeout(timoutId);
return getNavigatorLocks().request(
`db-lock-${this._dbIdentifier}`,
{ signal: abortController.signal },
async () => {
this.pendingAbortControllers.delete(abortController);
if (timoutId) {
clearTimeout(timoutId);
}
const holdId = this.requiresHolds ? await this.baseDB.markHold() : null;
try {
return await callback();
} finally {
if (holdId) {
await this.baseDB.releaseHold(holdId);
}
}
}
return callback();
});
);
}

async readTransaction<T>(fn: (tx: Transaction) => Promise<T>, options?: DBLockOptions | undefined): Promise<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,26 @@ export class WorkerWrappedAsyncDatabaseConnection<Config extends ResolvedWebSQLO
this.notifyRemoteClosed!.abort();
}

markHold(): Promise<string> {
return this.withRemote(() => this.baseConnection.markHold());
}

releaseHold(holdId: string): Promise<void> {
return this.withRemote(() => this.baseConnection.releaseHold(holdId));
}

isAutoCommit(): Promise<boolean> {
return this.withRemote(() => this.baseConnection.isAutoCommit());
}

private withRemote<T>(workerPromise: () => Promise<T>): Promise<T> {
const controller = this.notifyRemoteClosed;
if (controller) {
return new Promise((resolve, reject) => {
if (controller.signal.aborted) {
reject(new Error('Called operation on closed remote'));
// Don't run the operation if we're going to reject
return;
}

function handleAbort() {
Expand Down
45 changes: 45 additions & 0 deletions packages/web/src/db/adapters/wa-sqlite/WASQLiteConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ export type WASQLiteBroadCastTableUpdateEvent = {
*/
export type WASQLiteConnectionListener = {
tablesUpdated: (event: BatchedUpdateNotification) => void;
/**
* Triggered when an active hold is overwritten by a new hold.
* This is most likely to happen when a shared connection has been closed
* without releasing the hold.
* This listener can be used to cleanup any resources associated with the previous hold.
* @param holdId - The id of the hold that has been overwritten.
*/
holdOverwritten: (holdId: string) => Promise<void>;
};

/**
Expand Down Expand Up @@ -148,6 +156,9 @@ export class WASqliteConnection
*/
protected connectionId: number;

protected _holdCounter: number;
protected _holdId: string | null;

constructor(protected options: ResolvedWASQLiteOpenFactoryOptions) {
super();
this.updatedTables = new Set();
Expand All @@ -156,6 +167,16 @@ export class WASqliteConnection
this.connectionId = new Date().valueOf() + Math.random();
this.statementMutex = new Mutex();
this._moduleFactory = DEFAULT_MODULE_FACTORIES[this.options.vfs];
this._holdCounter = 0;
this._holdId = null;
}

/**
* Gets the id for the current hold.
* This can be used to check for invalid states.
*/
get currentHoldId() {
return this._holdId;
}

protected get sqliteAPI() {
Expand All @@ -172,6 +193,30 @@ export class WASqliteConnection
return this._dbP;
}

/**
* Checks if the database connection is in autocommit mode.
* @returns true if in autocommit mode, false if in a transaction
*/
async isAutoCommit(): Promise<boolean> {
return this.sqliteAPI.get_autocommit(this.dbP) != 0;
}

async markHold(): Promise<string> {
const previousHoldId = this._holdId;
this._holdId = `${++this._holdCounter}`;
if (previousHoldId) {
await this.iterateAsyncListeners(async (cb) => cb.holdOverwritten?.(previousHoldId));
}
return this._holdId;
}

async releaseHold(holdId: string): Promise<void> {
if (holdId != this._holdId) {
throw new Error(`Invalid hold state, expected ${this._holdId} but got ${holdId}`);
}
this._holdId = null;
}

protected async openDB() {
this._dbP = await this.sqliteAPI.open_v2(this.options.dbFilename);
return this._dbP;
Expand Down
131 changes: 131 additions & 0 deletions packages/web/src/worker/db/SharedWASQLiteConnection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import { ILogger } from '@powersync/common';
import {
AsyncDatabaseConnection,
OnTableChangeCallback,
ProxiedQueryResult
} from '../../db/adapters/AsyncDatabaseConnection';
import { ResolvedWebSQLOpenOptions } from '../../db/adapters/web-sql-flags';

/**
* Keeps track of open DB connections and the clients which
* are using it.
*/
export type SharedDBWorkerConnection = {
clientIds: Set<number>;
db: AsyncDatabaseConnection;
};

export type SharedWASQLiteConnectionOptions = {
dbMap: Map<string, SharedDBWorkerConnection>;
dbFilename: string;
clientId: number;
logger: ILogger;
};

export class SharedWASQLiteConnection implements AsyncDatabaseConnection {
protected isClosing: boolean;
// Keeps track if this current hold if the shared connection has a hold
protected activeHoldId: string | null;

constructor(protected options: SharedWASQLiteConnectionOptions) {
// Add this client ID to the set of known clients
this.clientIds.add(options.clientId);
this.isClosing = false;
this.activeHoldId = null;
}

protected get logger() {
return this.options.logger;
}

protected get dbEntry() {
return this.options.dbMap.get(this.options.dbFilename)!;
}

protected get connection() {
return this.dbEntry.db;
}

protected get clientIds() {
return this.dbEntry.clientIds;
}

async init(): Promise<void> {
// No-op since the connection is already initialized when it was created
}

async markHold(): Promise<string> {
this.activeHoldId = await this.connection.markHold();
return this.activeHoldId;
}

async releaseHold(id: string): Promise<void> {
try {
await this.connection.releaseHold(id);
} finally {
this.activeHoldId = null;
}
}

async isAutoCommit(): Promise<boolean> {
return this.connection.isAutoCommit();
}

/**
* Handles closing of a shared connection.
* The connection is only closed if there are no active clients using it.
*/
async close(): Promise<void> {
// This prevents further statements on this connection from being executed
this.isClosing = true;
const { clientIds, logger } = this;
const { clientId, dbFilename, dbMap } = this.options;
logger.debug(`Close requested from client ${clientId} of ${[...clientIds]}`);
clientIds.delete(clientId);

if (this.activeHoldId) {
// We can't cleanup here since we're not in a lock context.
// The cleanup will occur once a new hold is acquired.
this.logger.info(
`Hold ${this.activeHoldId} was still active when the connection was closed. Cleanup will occur once a new hold is acquired.`
);
}

if (clientIds.size == 0) {
logger.debug(`Closing connection to ${this.options}.`);
const connection = this.connection;
dbMap.delete(dbFilename);
await connection.close();
return;
}
logger.debug(`Connection to ${dbFilename} not closed yet due to active clients.`);
return;
}

protected async withClosing<T>(action: () => Promise<T>) {
if (this.isClosing) {
throw new Error('Connection is closing');
}
return action();
}

async execute(sql: string, params?: any[]): Promise<ProxiedQueryResult> {
return this.withClosing(() => this.connection.execute(sql, params));
}

async executeRaw(sql: string, params?: any[]): Promise<any[][]> {
return this.withClosing(() => this.connection.executeRaw(sql, params));
}

executeBatch(sql: string, params?: any[] | undefined): Promise<ProxiedQueryResult> {
return this.withClosing(() => this.connection.executeBatch(sql, params));
}

registerOnTableChange(callback: OnTableChangeCallback): Promise<() => void> {
return this.connection.registerOnTableChange(callback);
}

getConfig(): Promise<ResolvedWebSQLOpenOptions> {
return this.connection.getConfig();
}
}
Loading