Skip to content
7 changes: 7 additions & 0 deletions .changeset/spicy-vans-matter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/service-core': minor
'@powersync/service-types': minor
'@powersync/service-image': minor
---

Support gzip and zstd compression in http streams.
6 changes: 6 additions & 0 deletions packages/service-core/src/api/api-metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ export function createCoreAPIMetrics(engine: MetricsEngine): void {
unit: 'bytes'
});

engine.createCounter({
name: APIMetric.DATA_SENT_BYTES,
description: 'Size of data sent to clients, after compression if applicable',
unit: 'bytes'
});

engine.createCounter({
name: APIMetric.OPERATIONS_SYNCED,
description: 'Number of operations synced'
Expand Down
78 changes: 78 additions & 0 deletions packages/service-core/src/routes/compression.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import { PassThrough, pipeline, Readable, Transform } from 'node:stream';
import type Negotiator from 'negotiator';
import * as zlib from 'node:zlib';
import { RequestTracker } from '../sync/RequestTracker.js';

/**
* Compress a streamed response.
*
* `@fastify/compress` can do something similar, but does not appear to work as well on streamed responses.
* The manual implementation is simple enough, and gives us more control over the low-level details.
*
* @param negotiator Negotiator from the request, to negotiate response encoding
* @param stream plain-text stream
* @returns
*/
export function maybeCompressResponseStream(
negotiator: Negotiator,
stream: Readable,
tracker: RequestTracker
): { stream: Readable; encodingHeaders: { 'content-encoding'?: string } } {
const encoding = (negotiator as any).encoding(['identity', 'gzip', 'zstd'], { preferred: 'zstd' });
if (encoding == 'zstd') {
tracker.setCompressed(encoding);
return {
stream: transform(
stream,
// Available since Node v23.8.0, v22.15.0
// This does the actual compression in a background thread pool.
zlib.createZstdCompress({
// We need to flush the frame after every new input chunk, to avoid delaying data
// in the output stream.
flush: zlib.constants.ZSTD_e_flush,
params: {
// Default compression level is 3. We reduce this slightly to limit CPU overhead
[zlib.constants.ZSTD_c_compressionLevel]: 2
}
}),
tracker
),
encodingHeaders: { 'content-encoding': 'zstd' }
};
} else if (encoding == 'gzip') {
tracker.setCompressed(encoding);
return {
stream: transform(
stream,
zlib.createGzip({
// We need to flush the frame after every new input chunk, to avoid delaying data
// in the output stream.
flush: zlib.constants.Z_SYNC_FLUSH
}),
tracker
),
encodingHeaders: { 'content-encoding': 'gzip' }
};
} else {
return {
stream: stream,
encodingHeaders: {}
};
}
}

function transform(source: Readable, transform: Transform, tracker: RequestTracker) {
// pipe does not forward error events automatically, resulting in unhandled error
// events. This forwards it.
const out = new PassThrough();
const trackingTransform = new Transform({
transform(chunk, encoding, callback) {
tracker.addCompressedDataSent(chunk.length);
callback(null, chunk);
}
});
pipeline(source, transform, trackingTransform, out, (err) => {
if (err) out.destroy(err);
});
return out;
}
2 changes: 1 addition & 1 deletion packages/service-core/src/routes/endpoints/socket-route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
const serialized = sync.syncLineToBson(data);
responder.onNext({ data: serialized }, false);
requestedN--;
tracker.addDataSynced(serialized.length);
tracker.addPlaintextDataSynced(serialized.length);
}

if (requestedN <= 0 && !signal.aborted) {
Expand Down
15 changes: 9 additions & 6 deletions packages/service-core/src/routes/endpoints/sync-stream.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { ErrorCode, errors, logger, router, schema } from '@powersync/lib-services-framework';
import { RequestParameters } from '@powersync/service-sync-rules';
import { Readable } from 'stream';
import { ErrorCode, errors, router, schema } from '@powersync/lib-services-framework';
import Negotiator from 'negotiator';
import { Readable } from 'stream';

import * as sync from '../../sync/sync-index.js';
import * as util from '../../util/util-index.js';
Expand All @@ -10,6 +9,7 @@ import { authUser } from '../auth.js';
import { routeDefinition } from '../router.js';

import { APIMetric } from '@powersync/service-types';
import { maybeCompressResponseStream } from '../compression.js';

export enum SyncRoutes {
STREAM = '/sync/stream'
Expand All @@ -31,9 +31,10 @@ export const syncStreamed = routeDefinition({
const userAgent = headers['x-user-agent'] ?? headers['user-agent'];
const clientId = payload.params.client_id;
const streamStart = Date.now();
const negotiator = new Negotiator(payload.request);
// This falls back to JSON unless there's preference for the bson-stream in the Accept header.
const useBson = payload.request.headers.accept
? new Negotiator(payload.request).mediaType(supportedContentTypes) == concatenatedBsonContentType
? negotiator.mediaType(supportedContentTypes) == concatenatedBsonContentType
: false;

logger.defaultMeta = {
Expand Down Expand Up @@ -81,10 +82,11 @@ export const syncStreamed = routeDefinition({
});

const byteContents = useBson ? sync.bsonLines(syncLines) : sync.ndjson(syncLines);
const stream = Readable.from(sync.transformToBytesTracked(byteContents, tracker), {
const plainStream = Readable.from(sync.transformToBytesTracked(byteContents, tracker), {
objectMode: false,
highWaterMark: 16 * 1024
});
const { stream, encodingHeaders } = maybeCompressResponseStream(negotiator, plainStream, tracker);

// Best effort guess on why the stream was closed.
// We use the `??=` operator everywhere, so that we catch the first relevant
Expand Down Expand Up @@ -119,7 +121,8 @@ export const syncStreamed = routeDefinition({
return new router.RouterResponse({
status: 200,
headers: {
'Content-Type': useBson ? concatenatedBsonContentType : ndJsonContentType
'Content-Type': useBson ? concatenatedBsonContentType : ndJsonContentType,
...encodingHeaders
},
data: stream,
afterSend: async (details) => {
Expand Down
29 changes: 27 additions & 2 deletions packages/service-core/src/sync/RequestTracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@ import { MetricsEngine } from '../metrics/MetricsEngine.js';

import { APIMetric } from '@powersync/service-types';
import { SyncBucketData } from '../util/protocol-types.js';
import { ServiceAssertionError } from '@powersync/lib-services-framework';

/**
* Record sync stats per request stream.
*/
export class RequestTracker {
operationsSynced = 0;
dataSyncedBytes = 0;
dataSentBytes = 0;
operationCounts: OperationCounts = { put: 0, remove: 0, move: 0, clear: 0 };
largeBuckets: Record<string, number> = {};

private encoding: string | undefined = undefined;

constructor(private metrics: MetricsEngine) {
this.metrics = metrics;
}
Expand All @@ -29,18 +33,39 @@ export class RequestTracker {
this.metrics.getCounter(APIMetric.OPERATIONS_SYNCED).add(operations.total);
}

addDataSynced(bytes: number) {
setCompressed(encoding: string) {
this.encoding = encoding;
}

addPlaintextDataSynced(bytes: number) {
this.dataSyncedBytes += bytes;

this.metrics.getCounter(APIMetric.DATA_SYNCED_BYTES).add(bytes);

if (this.encoding == null) {
// This avoids having to create a separate stream just to track this
this.dataSentBytes += bytes;

this.metrics.getCounter(APIMetric.DATA_SENT_BYTES).add(bytes);
}
}

addCompressedDataSent(bytes: number) {
if (this.encoding == null) {
throw new ServiceAssertionError('No compression encoding set');
}
this.dataSentBytes += bytes;
this.metrics.getCounter(APIMetric.DATA_SENT_BYTES).add(bytes);
}

getLogMeta() {
return {
operations_synced: this.operationsSynced,
data_synced_bytes: this.dataSyncedBytes,
data_sent_bytes: this.dataSentBytes,
operation_counts: this.operationCounts,
large_buckets: this.largeBuckets
large_buckets: this.largeBuckets,
encoding: this.encoding
};
}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/service-core/src/sync/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ export async function* transformToBytesTracked(
encoded = data;
}

tracker.addDataSynced(encoded.length);
tracker.addPlaintextDataSynced(encoded.length);
yield encoded;
}
}
Expand Down
59 changes: 59 additions & 0 deletions packages/service-core/test/src/routes/mocks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import {
BucketStorageFactory,
createCoreAPIMetrics,
MetricsEngine,
OpenTelemetryMetricsFactory,
RouteAPI,
RouterEngine,
ServiceContext,
StorageEngine,
SyncContext,
SyncRulesBucketStorage
} from '@/index.js';
import { MeterProvider } from '@opentelemetry/sdk-metrics';

export function mockServiceContext(storage: Partial<SyncRulesBucketStorage> | null) {
// This is very incomplete - just enough to get the current tests passing.

const storageEngine: StorageEngine = {
activeBucketStorage: {
async getActiveStorage() {
return storage;
}
} as Partial<BucketStorageFactory>
} as any;

const meterProvider = new MeterProvider({
readers: []
});
const meter = meterProvider.getMeter('powersync-tests');
const metricsEngine = new MetricsEngine({
disable_telemetry_sharing: true,
factory: new OpenTelemetryMetricsFactory(meter)
});
createCoreAPIMetrics(metricsEngine);
const service_context: Partial<ServiceContext> = {
syncContext: new SyncContext({ maxBuckets: 1, maxDataFetchConcurrency: 1, maxParameterQueryResults: 1 }),
routerEngine: {
getAPI() {
return {
getParseSyncRulesOptions() {
return { defaultSchema: 'public' };
}
} as Partial<RouteAPI>;
},
addStopHandler() {
return () => {};
}
} as Partial<RouterEngine> as any,
storageEngine,
metricsEngine: metricsEngine,
// Not used
configuration: null as any,
lifeCycleEngine: null as any,
migrations: null as any,
replicationEngine: null as any,
serviceMode: null as any
};
return service_context as ServiceContext;
}
84 changes: 84 additions & 0 deletions packages/service-core/test/src/routes/stream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import { BasicRouterRequest, Context, SyncRulesBucketStorage } from '@/index.js';
import { logger, RouterResponse, ServiceError } from '@powersync/lib-services-framework';
import { SqlSyncRules } from '@powersync/service-sync-rules';
import { Readable, Writable } from 'stream';
import { pipeline } from 'stream/promises';
import { beforeEach, describe, expect, it, vi } from 'vitest';
import { syncStreamed } from '../../../src/routes/endpoints/sync-stream.js';
import { mockServiceContext } from './mocks.js';

describe('Stream Route', () => {
describe('compressed stream', () => {
it('handles missing sync rules', async () => {
const context: Context = {
logger: logger,
service_context: mockServiceContext(null)
};

const request: BasicRouterRequest = {
headers: {},
hostname: '',
protocol: 'http'
};

const error = (await (syncStreamed.handler({ context, params: {}, request }) as Promise<RouterResponse>).catch(
(e) => e
)) as ServiceError;

expect(error.errorData.status).toEqual(500);
expect(error.errorData.code).toEqual('PSYNC_S2302');
});

it('handles a stream error with compression', async () => {
// This primarily tests that an underlying storage error doesn't result in an uncaught error
// when compressing the stream.

const storage = {
getParsedSyncRules() {
return new SqlSyncRules('bucket_definitions: {}');
},
watchCheckpointChanges: async function* (options) {
throw new Error('Simulated storage error');
}
} as Partial<SyncRulesBucketStorage>;
const serviceContext = mockServiceContext(storage);

const context: Context = {
logger: logger,
service_context: serviceContext,
token_payload: {
exp: new Date().getTime() / 1000 + 10000,
iat: new Date().getTime() / 1000 - 10000,
sub: 'test-user'
}
};

// It may be worth eventually doing this via Fastify to test the full stack

const request: BasicRouterRequest = {
headers: {
'accept-encoding': 'gzip'
},
hostname: '',
protocol: 'http'
};

const response = await (syncStreamed.handler({ context, params: {}, request }) as Promise<RouterResponse>);
expect(response.status).toEqual(200);
const stream = response.data as Readable;
const r = await drainWithTimeout(stream).catch((error) => error);
expect(r.message).toContain('Simulated storage error');
});
});
});

export async function drainWithTimeout(readable: Readable, ms = 2_000) {
const devNull = new Writable({
write(_chunk, _enc, cb) {
cb();
} // discard everything
});

// Throws AbortError if it takes longer than ms, and destroys the stream
await pipeline(readable, devNull, { signal: AbortSignal.timeout(ms) });
}
2 changes: 2 additions & 0 deletions packages/types/src/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
export enum APIMetric {
// Uncompressed size of synced data from PowerSync to Clients
DATA_SYNCED_BYTES = 'powersync_data_synced_bytes_total',
// Potentially-compressed size of data sent from PowerSync to Clients
DATA_SENT_BYTES = 'powersync_data_sent_bytes_total',
// Number of operations synced
OPERATIONS_SYNCED = 'powersync_operations_synced_total',
// Number of concurrent sync connections
Expand Down