Skip to content

Commit eff8cbf

Browse files
authored
Sync streams (#707)
1 parent f74aeab commit eff8cbf

29 files changed

+1248
-183
lines changed

.changeset/angry-ducks-sneeze.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@powersync/react-native': minor
3+
'@powersync/common': minor
4+
'@powersync/web': minor
5+
---
6+
7+
Add alpha support for sync streams, allowing different sets of data to be synced dynamically.

packages/common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 46 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,17 @@ import {
99
UpdateNotification,
1010
isBatchedUpdateNotification
1111
} from '../db/DBAdapter.js';
12-
import { FULL_SYNC_PRIORITY } from '../db/crud/SyncProgress.js';
13-
import { SyncPriorityStatus, SyncStatus } from '../db/crud/SyncStatus.js';
12+
import { SyncStatus } from '../db/crud/SyncStatus.js';
1413
import { UploadQueueStats } from '../db/crud/UploadQueueStatus.js';
1514
import { Schema } from '../db/schema/Schema.js';
1615
import { BaseObserver } from '../utils/BaseObserver.js';
1716
import { ControlledExecutor } from '../utils/ControlledExecutor.js';
1817
import { symbolAsyncIterator, throttleTrailing } from '../utils/async.js';
19-
import { ConnectionManager } from './ConnectionManager.js';
18+
import {
19+
ConnectionManager,
20+
CreateSyncImplementationOptions,
21+
InternalSubscriptionAdapter
22+
} from './ConnectionManager.js';
2023
import { CustomQuery } from './CustomQuery.js';
2124
import { ArrayQueryDefinition, Query } from './Query.js';
2225
import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory.js';
@@ -40,6 +43,8 @@ import { TriggerManagerImpl } from './triggers/TriggerManagerImpl.js';
4043
import { DEFAULT_WATCH_THROTTLE_MS, WatchCompatibleQuery } from './watched/WatchedQuery.js';
4144
import { OnChangeQueryProcessor } from './watched/processors/OnChangeQueryProcessor.js';
4245
import { WatchedQueryComparator } from './watched/processors/comparators.js';
46+
import { coreStatusToJs, CoreSyncStatus } from './sync/stream/core-instruction.js';
47+
import { SyncStream } from './sync/sync-streams.js';
4348

4449
export interface DisconnectAndClearOptions {
4550
/** When set to false, data in local-only tables is preserved. */
@@ -182,6 +187,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
182187
protected bucketStorageAdapter: BucketStorageAdapter;
183188
protected _isReadyPromise: Promise<void>;
184189
protected connectionManager: ConnectionManager;
190+
private subscriptions: InternalSubscriptionAdapter;
185191

186192
get syncStreamImplementation() {
187193
return this.connectionManager.syncStreamImplementation;
@@ -236,10 +242,18 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
236242
this.runExclusiveMutex = new Mutex();
237243

238244
// Start async init
245+
this.subscriptions = {
246+
firstStatusMatching: (predicate, abort) => this.waitForStatus(predicate, abort),
247+
resolveOfflineSyncStatus: () => this.resolveOfflineSyncStatus(),
248+
rustSubscriptionsCommand: async (payload) => {
249+
await this.writeTransaction((tx) => {
250+
return tx.execute('select powersync_control(?,?)', ['subscriptions', JSON.stringify(payload)]);
251+
});
252+
}
253+
};
239254
this.connectionManager = new ConnectionManager({
240255
createSyncImplementation: async (connector, options) => {
241256
await this.waitForReady();
242-
243257
return this.runExclusive(async () => {
244258
const sync = this.generateSyncStreamImplementation(connector, this.resolvedConnectionOptions(options));
245259
const onDispose = sync.registerListener({
@@ -304,7 +318,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
304318

305319
protected abstract generateSyncStreamImplementation(
306320
connector: PowerSyncBackendConnector,
307-
options: RequiredAdditionalConnectionOptions
321+
options: CreateSyncImplementationOptions & RequiredAdditionalConnectionOptions
308322
): StreamingSyncImplementation;
309323

310324
protected abstract generateBucketStorageAdapter(): BucketStorageAdapter;
@@ -338,13 +352,18 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
338352
? (status: SyncStatus) => status.hasSynced
339353
: (status: SyncStatus) => status.statusForPriority(priority).hasSynced;
340354

341-
if (statusMatches(this.currentStatus)) {
355+
return this.waitForStatus(statusMatches, signal);
356+
}
357+
358+
private async waitForStatus(predicate: (status: SyncStatus) => any, signal?: AbortSignal): Promise<void> {
359+
if (predicate(this.currentStatus)) {
342360
return;
343361
}
362+
344363
return new Promise((resolve) => {
345364
const dispose = this.registerListener({
346365
statusChanged: (status) => {
347-
if (statusMatches(status)) {
366+
if (predicate(status)) {
348367
dispose();
349368
resolve();
350369
}
@@ -373,7 +392,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
373392
await this.bucketStorageAdapter.init();
374393
await this._loadVersion();
375394
await this.updateSchema(this.options.schema);
376-
await this.updateHasSynced();
395+
await this.resolveOfflineSyncStatus();
377396
await this.database.execute('PRAGMA RECURSIVE_TRIGGERS=TRUE');
378397
this.ready = true;
379398
this.iterateListeners((cb) => cb.initialized?.());
@@ -403,30 +422,13 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
403422
}
404423
}
405424

406-
protected async updateHasSynced() {
407-
const result = await this.database.getAll<{ priority: number; last_synced_at: string }>(
408-
'SELECT priority, last_synced_at FROM ps_sync_state ORDER BY priority DESC'
409-
);
410-
let lastCompleteSync: Date | undefined;
411-
const priorityStatusEntries: SyncPriorityStatus[] = [];
412-
413-
for (const { priority, last_synced_at } of result) {
414-
const parsedDate = new Date(last_synced_at + 'Z');
425+
protected async resolveOfflineSyncStatus() {
426+
const result = await this.database.get<{ r: string }>('SELECT powersync_offline_sync_status() as r');
427+
const parsed = JSON.parse(result.r) as CoreSyncStatus;
415428

416-
if (priority == FULL_SYNC_PRIORITY) {
417-
// This lowest-possible priority represents a complete sync.
418-
lastCompleteSync = parsedDate;
419-
} else {
420-
priorityStatusEntries.push({ priority, hasSynced: true, lastSyncedAt: parsedDate });
421-
}
422-
}
423-
424-
const hasSynced = lastCompleteSync != null;
425429
const updatedStatus = new SyncStatus({
426430
...this.currentStatus.toJSON(),
427-
hasSynced,
428-
priorityStatusEntries,
429-
lastSyncedAt: lastCompleteSync
431+
...coreStatusToJs(parsed)
430432
});
431433

432434
if (!updatedStatus.isEqual(this.currentStatus)) {
@@ -471,7 +473,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
471473
}
472474

473475
// Use the options passed in during connect, or fallback to the options set during database creation or fallback to the default options
474-
resolvedConnectionOptions(options?: PowerSyncConnectionOptions): RequiredAdditionalConnectionOptions {
476+
protected resolvedConnectionOptions(
477+
options: CreateSyncImplementationOptions
478+
): CreateSyncImplementationOptions & RequiredAdditionalConnectionOptions {
475479
return {
476480
...options,
477481
retryDelayMs:
@@ -540,6 +544,18 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
540544
this.iterateListeners((l) => l.statusChanged?.(this.currentStatus));
541545
}
542546

547+
/**
548+
* Create a sync stream to query its status or to subscribe to it.
549+
*
550+
* @param name The name of the stream to subscribe to.
551+
* @param params Optional parameters for the stream subscription.
552+
* @returns A {@link SyncStream} instance that can be subscribed to.
553+
* @experimental Sync streams are currently in alpha.
554+
*/
555+
syncStream(name: string, params?: Record<string, any>): SyncStream {
556+
return this.connectionManager.stream(this.subscriptions, name, params ?? null);
557+
}
558+
543559
/**
544560
* Close the database, releasing resources.
545561
*

packages/common/src/client/ConnectionManager.ts

Lines changed: 158 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,18 @@ import { ILogger } from 'js-logger';
22
import { BaseListener, BaseObserver } from '../utils/BaseObserver.js';
33
import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector.js';
44
import {
5+
AdditionalConnectionOptions,
56
InternalConnectionOptions,
6-
StreamingSyncImplementation
7+
StreamingSyncImplementation,
8+
SubscribedStream
79
} from './sync/stream/AbstractStreamingSyncImplementation.js';
10+
import {
11+
SyncStream,
12+
SyncStreamDescription,
13+
SyncStreamSubscribeOptions,
14+
SyncStreamSubscription
15+
} from './sync/sync-streams.js';
16+
import { SyncStatus } from '../db/crud/SyncStatus.js';
817

918
/**
1019
* @internal
@@ -18,14 +27,30 @@ export interface ConnectionManagerSyncImplementationResult {
1827
onDispose: () => Promise<void> | void;
1928
}
2029

30+
/**
31+
* The subset of {@link AbstractStreamingSyncImplementationOptions} managed by the connection manager.
32+
*
33+
* @internal
34+
*/
35+
export interface CreateSyncImplementationOptions extends AdditionalConnectionOptions {
36+
subscriptions: SubscribedStream[];
37+
}
38+
39+
export interface InternalSubscriptionAdapter {
40+
firstStatusMatching(predicate: (status: SyncStatus) => any, abort?: AbortSignal): Promise<void>;
41+
resolveOfflineSyncStatus(): Promise<void>;
42+
rustSubscriptionsCommand(payload: any): Promise<void>;
43+
}
44+
2145
/**
2246
* @internal
2347
*/
2448
export interface ConnectionManagerOptions {
2549
createSyncImplementation(
2650
connector: PowerSyncBackendConnector,
27-
options: InternalConnectionOptions
51+
options: CreateSyncImplementationOptions
2852
): Promise<ConnectionManagerSyncImplementationResult>;
53+
2954
logger: ILogger;
3055
}
3156

@@ -76,6 +101,13 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
76101
*/
77102
protected syncDisposer: (() => Promise<void> | void) | null;
78103

104+
/**
105+
* Subscriptions managed in this connection manager.
106+
*
107+
* On the web, these local subscriptions are merged across tabs by a shared worker.
108+
*/
109+
private locallyActiveSubscriptions = new Map<string, ActiveSubscription>();
110+
79111
constructor(protected options: ConnectionManagerOptions) {
80112
super();
81113
this.connectingPromise = null;
@@ -102,7 +134,7 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
102134
// Update pending options to the latest values
103135
this.pendingConnectionOptions = {
104136
connector,
105-
options: options ?? {}
137+
options
106138
};
107139

108140
// Disconnecting here provides aborting in progress connection attempts.
@@ -169,7 +201,11 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
169201
appliedOptions = options;
170202

171203
this.pendingConnectionOptions = null;
172-
const { sync, onDispose } = await this.options.createSyncImplementation(connector, options);
204+
205+
const { sync, onDispose } = await this.options.createSyncImplementation(connector, {
206+
subscriptions: this.activeStreams,
207+
...options
208+
});
173209
this.iterateListeners((l) => l.syncStreamCreated?.(sync));
174210
this.syncStreamImplementation = sync;
175211
this.syncDisposer = onDispose;
@@ -236,4 +272,122 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
236272
await sync?.dispose();
237273
await disposer?.();
238274
}
275+
276+
stream(adapter: InternalSubscriptionAdapter, name: string, parameters: Record<string, any> | null): SyncStream {
277+
const desc = { name, parameters } satisfies SyncStreamDescription;
278+
279+
const waitForFirstSync = (abort?: AbortSignal) => {
280+
return adapter.firstStatusMatching((s) => s.forStream(desc)?.subscription.hasSynced, abort);
281+
};
282+
283+
return {
284+
...desc,
285+
subscribe: async (options?: SyncStreamSubscribeOptions) => {
286+
// NOTE: We also run this command if a subscription already exists, because this increases the expiry date
287+
// (relevant if the app is closed before connecting again, where the last subscribe call determines the ttl).
288+
await adapter.rustSubscriptionsCommand({
289+
subscribe: {
290+
stream: {
291+
name,
292+
params: parameters
293+
},
294+
ttl: options?.ttl,
295+
priority: options?.priority
296+
}
297+
});
298+
299+
if (!this.syncStreamImplementation) {
300+
// We're not connected. So, update the offline sync status to reflect the new subscription.
301+
// (With an active iteration, the sync client would include it in its state).
302+
await adapter.resolveOfflineSyncStatus();
303+
}
304+
305+
const key = `${name}|${JSON.stringify(parameters)}`;
306+
let subscription = this.locallyActiveSubscriptions.get(key);
307+
if (subscription == null) {
308+
const clearSubscription = () => {
309+
this.locallyActiveSubscriptions.delete(key);
310+
this.subscriptionsMayHaveChanged();
311+
};
312+
313+
subscription = new ActiveSubscription(name, parameters, this.logger, waitForFirstSync, clearSubscription);
314+
this.locallyActiveSubscriptions.set(key, subscription);
315+
this.subscriptionsMayHaveChanged();
316+
}
317+
318+
return new SyncStreamSubscriptionHandle(subscription);
319+
},
320+
unsubscribeAll: async () => {
321+
await adapter.rustSubscriptionsCommand({ unsubscribe: { name, params: parameters } });
322+
this.subscriptionsMayHaveChanged();
323+
}
324+
};
325+
}
326+
327+
private get activeStreams() {
328+
return [...this.locallyActiveSubscriptions.values()].map((a) => ({ name: a.name, params: a.parameters }));
329+
}
330+
331+
private subscriptionsMayHaveChanged() {
332+
if (this.syncStreamImplementation) {
333+
this.syncStreamImplementation.updateSubscriptions(this.activeStreams);
334+
}
335+
}
336+
}
337+
338+
class ActiveSubscription {
339+
refcount: number = 0;
340+
341+
constructor(
342+
readonly name: string,
343+
readonly parameters: Record<string, any> | null,
344+
readonly logger: ILogger,
345+
readonly waitForFirstSync: (abort?: AbortSignal) => Promise<void>,
346+
private clearSubscription: () => void
347+
) {}
348+
349+
decrementRefCount() {
350+
this.refcount--;
351+
if (this.refcount == 0) {
352+
this.clearSubscription();
353+
}
354+
}
355+
}
356+
357+
class SyncStreamSubscriptionHandle implements SyncStreamSubscription {
358+
private active: boolean = true;
359+
360+
constructor(readonly subscription: ActiveSubscription) {
361+
subscription.refcount++;
362+
_finalizer?.register(this, subscription);
363+
}
364+
365+
get name() {
366+
return this.subscription.name;
367+
}
368+
369+
get parameters() {
370+
return this.subscription.parameters;
371+
}
372+
373+
waitForFirstSync(abort?: AbortSignal): Promise<void> {
374+
return this.subscription.waitForFirstSync(abort);
375+
}
376+
377+
unsubscribe(): void {
378+
if (this.active) {
379+
this.active = false;
380+
_finalizer?.unregister(this);
381+
this.subscription.decrementRefCount();
382+
}
383+
}
239384
}
385+
386+
const _finalizer =
387+
'FinalizationRegistry' in globalThis
388+
? new FinalizationRegistry<ActiveSubscription>((sub) => {
389+
sub.logger.warn(
390+
`A subscription to ${sub.name} with params ${JSON.stringify(sub.parameters)} leaked! Please ensure calling unsubscribe() when you don't need a subscription anymore. For global subscriptions, consider storing them in global fields to avoid this warning.`
391+
);
392+
})
393+
: null;

0 commit comments

Comments
 (0)