diff --git a/.changeset/wise-elephants-trade.md b/.changeset/wise-elephants-trade.md new file mode 100644 index 00000000..b57c9998 --- /dev/null +++ b/.changeset/wise-elephants-trade.md @@ -0,0 +1,9 @@ +--- +'@powersync/service-module-mysql': minor +'@powersync/service-module-postgres-storage': patch +'@powersync/service-module-mongodb-storage': patch +'@powersync/service-core': patch +--- + +- Hooked up the MySQL binlog heartbeat events with the bucket batch keepalive mechanism. + Heartbeat events will now update the latest keepalive timestamp in the sync rules. diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts index ceee3542..6952092c 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts @@ -807,7 +807,7 @@ export class MongoBucketBatch } async keepalive(lsn: string): Promise { - if (this.last_checkpoint_lsn != null && lsn <= this.last_checkpoint_lsn) { + if (this.last_checkpoint_lsn != null && lsn < this.last_checkpoint_lsn) { // No-op return false; } diff --git a/modules/module-mysql/package.json b/modules/module-mysql/package.json index 3d11f5a2..8fbcf281 100644 --- a/modules/module-mysql/package.json +++ b/modules/module-mysql/package.json @@ -33,7 +33,7 @@ "@powersync/service-sync-rules": "workspace:*", "@powersync/service-types": "workspace:*", "@powersync/service-jsonbig": "workspace:*", - "@powersync/mysql-zongji": "^0.4.0", + "@powersync/mysql-zongji": "^0.5.0", "async": "^3.2.4", "mysql2": "^3.11.0", "node-sql-parser": "^5.3.9", diff --git a/modules/module-mysql/src/replication/BinLogReplicationJob.ts b/modules/module-mysql/src/replication/BinLogReplicationJob.ts index d97aa528..06f5dfb9 100644 --- a/modules/module-mysql/src/replication/BinLogReplicationJob.ts +++ b/modules/module-mysql/src/replication/BinLogReplicationJob.ts @@ -21,7 +21,9 @@ export class BinLogReplicationJob extends replication.AbstractReplicationJob { return this.options.storage.slot_name; } - async keepAlive() {} + async keepAlive() { + // Keepalives are handled by the binlog heartbeat mechanism + } async replicate() { try { @@ -56,6 +58,7 @@ export class BinLogReplicationJob extends replication.AbstractReplicationJob { const connectionManager = this.connectionFactory.create({ // Pool connections are only used intermittently. idleTimeout: 30_000, + connectionLimit: 2, connectAttributes: { // https://dev.mysql.com/doc/refman/8.0/en/performance-schema-connection-attribute-tables.html diff --git a/modules/module-mysql/src/replication/BinLogStream.ts b/modules/module-mysql/src/replication/BinLogStream.ts index 7543432c..3d4c8756 100644 --- a/modules/module-mysql/src/replication/BinLogStream.ts +++ b/modules/module-mysql/src/replication/BinLogStream.ts @@ -398,7 +398,6 @@ export class BinLogStream { const fromGTID = checkpoint_lsn ? common.ReplicatedGTID.fromSerialized(checkpoint_lsn) : await common.readExecutedGtid(connection); - const binLogPositionState = fromGTID.position; connection.release(); if (!this.stopped) { @@ -409,7 +408,7 @@ export class BinLogStream { const binlogListener = new BinLogListener({ logger: this.logger, sourceTables: this.syncRules.getSourceTables(), - startPosition: binLogPositionState, + startGTID: fromGTID, connectionManager: this.connections, serverId: serverId, eventHandler: binlogEventHandler @@ -455,6 +454,12 @@ export class BinLogStream { tableEntry: tableMap }); }, + onKeepAlive: async (lsn: string) => { + const didCommit = await batch.keepalive(lsn); + if (didCommit) { + this.oldestUncommittedChange = null; + } + }, onCommit: async (lsn: string) => { this.metrics.getCounter(ReplicationMetric.TRANSACTIONS_REPLICATED).add(1); const didCommit = await batch.commit(lsn, { oldestUncommittedChange: this.oldestUncommittedChange }); diff --git a/modules/module-mysql/src/replication/zongji/BinLogListener.ts b/modules/module-mysql/src/replication/zongji/BinLogListener.ts index f7b2ccd3..fc10eadb 100644 --- a/modules/module-mysql/src/replication/zongji/BinLogListener.ts +++ b/modules/module-mysql/src/replication/zongji/BinLogListener.ts @@ -29,6 +29,10 @@ import { TablePattern } from '@powersync/service-sync-rules'; const { Parser } = pkg; +/** + * Seconds of inactivity after which a keepalive event is sent by the MySQL server. + */ +export const KEEPALIVE_INACTIVITY_THRESHOLD = 30; export type Row = Record; /** @@ -65,6 +69,7 @@ export interface BinLogEventHandler { onDelete: (rows: Row[], tableMap: TableMapEntry) => Promise; onCommit: (lsn: string) => Promise; onSchemaChange: (change: SchemaChange) => Promise; + onKeepAlive: (lsn: string) => Promise; } export interface BinLogListenerOptions { @@ -72,8 +77,9 @@ export interface BinLogListenerOptions { eventHandler: BinLogEventHandler; sourceTables: TablePattern[]; serverId: number; - startPosition: common.BinLogPosition; + startGTID: common.ReplicatedGTID; logger?: Logger; + keepAliveInactivitySeconds?: number; } /** @@ -85,16 +91,19 @@ export class BinLogListener { private connectionManager: MySQLConnectionManager; private eventHandler: BinLogEventHandler; private binLogPosition: common.BinLogPosition; - private currentGTID: common.ReplicatedGTID | null; + private currentGTID: common.ReplicatedGTID; private logger: Logger; private listenerError: Error | null; private databaseFilter: { [schema: string]: (table: string) => boolean }; + private isStopped: boolean = false; + private isStopping: boolean = false; + + // Flag to indicate if are currently in a transaction that involves multiple row mutation events. + private isTransactionOpen = false; zongji: ZongJi; processingQueue: async.QueueObject; - isStopped: boolean = false; - isStopping: boolean = false; /** * The combined size in bytes of all the binlog events currently in the processing queue. */ @@ -104,8 +113,8 @@ export class BinLogListener { this.logger = options.logger ?? defaultLogger; this.connectionManager = options.connectionManager; this.eventHandler = options.eventHandler; - this.binLogPosition = options.startPosition; - this.currentGTID = null; + this.binLogPosition = options.startGTID.position; + this.currentGTID = options.startGTID; this.sqlParser = new Parser(); this.processingQueue = this.createProcessingQueue(); this.zongji = this.createZongjiListener(); @@ -130,14 +139,13 @@ export class BinLogListener { `${isRestart ? 'Restarting' : 'Starting'} BinLog Listener with replica client id:${this.options.serverId}...` ); - // Set a heartbeat interval for the Zongji replication connection - // Zongji does not explicitly handle the heartbeat events - they are categorized as event:unknown - // The heartbeat events are enough to keep the connection alive for setTimeout to work on the socket. + // Set a heartbeat interval for the Zongji replication connection, these events are enough to keep the connection + // alive for setTimeout to work on the socket. // The heartbeat needs to be set before starting the listener, since the replication connection is locked once replicating await new Promise((resolve, reject) => { this.zongji.connection.query( // In nanoseconds, 10^9 = 1s - 'set @master_heartbeat_period=28*1000000000', + `set @master_heartbeat_period=${this.options.keepAliveInactivitySeconds ?? KEEPALIVE_INACTIVITY_THRESHOLD}*1000000000`, (error: any, results: any, _fields: any) => { if (error) { reject(error); @@ -158,9 +166,19 @@ export class BinLogListener { }); this.zongji.start({ - // We ignore the unknown/heartbeat event since it currently serves no purpose other than to keep the connection alive - // tablemap events always need to be included for the other row events to work - includeEvents: ['tablemap', 'writerows', 'updaterows', 'deleterows', 'xid', 'rotate', 'gtidlog', 'query'], + // Tablemap events always need to be included for the other row events to work + includeEvents: [ + 'tablemap', + 'writerows', + 'updaterows', + 'deleterows', + 'xid', + 'rotate', + 'gtidlog', + 'query', + 'heartbeat', + 'heartbeat_v2' + ], includeSchema: this.databaseFilter, filename: this.binLogPosition.filename, position: this.binLogPosition.offset, @@ -289,19 +307,24 @@ export class BinLogListener { this.logger.info(`Processed GTID event: ${this.currentGTID.comparable}`); break; case zongji_utils.eventIsRotation(evt): - const newFile = this.binLogPosition.filename !== evt.binlogName; + // The first event when starting replication is a synthetic Rotate event + // It describes the last binlog file and position that the replica client processed this.binLogPosition.filename = evt.binlogName; + this.binLogPosition.offset = evt.nextPosition !== 0 ? evt.nextPosition : evt.position; await this.eventHandler.onRotate(); + const newFile = this.binLogPosition.filename !== evt.binlogName; if (newFile) { this.logger.info( `Processed Rotate event. New BinLog file is: ${this.binLogPosition.filename}:${this.binLogPosition.offset}` ); } + break; case zongji_utils.eventIsWriteMutation(evt): const tableMap = evt.tableMap[evt.tableId]; await this.eventHandler.onWrite(evt.rows, tableMap); + this.binLogPosition.offset = evt.nextPosition; this.logger.info( `Processed Write event for table [${tableMap.parentSchema}.${tableMap.tableName}]. ${evt.rows.length} row(s) inserted.` ); @@ -312,20 +335,33 @@ export class BinLogListener { evt.rows.map((row) => row.before), evt.tableMap[evt.tableId] ); + this.binLogPosition.offset = evt.nextPosition; this.logger.info( `Processed Update event for table [${evt.tableMap[evt.tableId].tableName}]. ${evt.rows.length} row(s) updated.` ); break; case zongji_utils.eventIsDeleteMutation(evt): await this.eventHandler.onDelete(evt.rows, evt.tableMap[evt.tableId]); + this.binLogPosition.offset = evt.nextPosition; this.logger.info( `Processed Delete event for table [${evt.tableMap[evt.tableId].tableName}]. ${evt.rows.length} row(s) deleted.` ); break; + case zongji_utils.eventIsHeartbeat(evt): + case zongji_utils.eventIsHeartbeat_v2(evt): + // Heartbeats are sent by the master to keep the connection alive after a period of inactivity. They are synthetic + // so are not written to the binlog. Consequently, they have no effect on the binlog position. + // We forward these along with the current GTID to the event handler, but don't want to do this if a transaction is in progress. + if (!this.isTransactionOpen) { + await this.eventHandler.onKeepAlive(this.currentGTID.comparable); + } + this.logger.debug(`Processed Heartbeat event. Current GTID is: ${this.currentGTID.comparable}`); + break; case zongji_utils.eventIsXid(evt): + this.isTransactionOpen = false; this.binLogPosition.offset = evt.nextPosition; const LSN = new common.ReplicatedGTID({ - raw_gtid: this.currentGTID!.raw, + raw_gtid: this.currentGTID.raw, position: this.binLogPosition }).comparable; await this.eventHandler.onCommit(LSN); @@ -336,8 +372,6 @@ export class BinLogListener { break; } - // Update the binlog position after processing the event - this.binLogPosition.offset = evt.nextPosition; this.queueMemoryUsage -= evt.size; }; } @@ -345,14 +379,15 @@ export class BinLogListener { private async processQueryEvent(event: BinLogQueryEvent): Promise { const { query, nextPosition } = event; - // BEGIN query events mark the start of a transaction before any row events. They are not relevant for schema changes + // BEGIN query events mark the start of a transaction before any row events. They are not schema changes so no further parsing is necessary. if (query === 'BEGIN') { + this.isTransactionOpen = true; return; } const schemaChanges = this.toSchemaChanges(query, event.schema); if (schemaChanges.length > 0) { - // Since handling the schema changes can take a long time, we need to stop the Zongji listener instead of pausing it. + // Handling schema changes can take a long time, so we stop the Zongji listener whilst handling them to prevent the listener from timing out. await this.stopZongji(); for (const change of schemaChanges) { @@ -360,19 +395,21 @@ export class BinLogListener { await this.eventHandler.onSchemaChange(change); } - // DDL queries are auto commited, but do not come with a corresponding Xid event. - // This is problematic for DDL queries which result in row events because the checkpoint is not moved on, - // so we manually commit here. - this.binLogPosition.offset = nextPosition; - const LSN = new common.ReplicatedGTID({ - raw_gtid: this.currentGTID!.raw, - position: this.binLogPosition - }).comparable; - await this.eventHandler.onCommit(LSN); + // DDL queries are auto commited, but do not come with a corresponding Xid event, in those cases we trigger a manual commit if we are not already in a transaction. + // Some DDL queries include row events, and in those cases will include a Xid event. + if (!this.isTransactionOpen) { + this.binLogPosition.offset = nextPosition; + const LSN = new common.ReplicatedGTID({ + raw_gtid: this.currentGTID.raw, + position: this.binLogPosition + }).comparable; + await this.eventHandler.onCommit(LSN); + } this.logger.info(`Successfully processed ${schemaChanges.length} schema change(s).`); // If there are still events in the processing queue, we need to process those before restarting Zongji + // This avoids potentially processing the same events again after a restart. if (!this.processingQueue.idle()) { this.logger.info(`Processing [${this.processingQueue.length()}] events(s) before resuming...`); this.processingQueue.drain(async () => { @@ -381,6 +418,13 @@ export class BinLogListener { } else { await this.restartZongji(); } + } else if (!this.isTransactionOpen) { + this.binLogPosition.offset = nextPosition; + const LSN = new common.ReplicatedGTID({ + raw_gtid: this.currentGTID.raw, + position: this.binLogPosition + }).comparable; + await this.eventHandler.onCommit(LSN); } } diff --git a/modules/module-mysql/src/replication/zongji/zongji-utils.ts b/modules/module-mysql/src/replication/zongji/zongji-utils.ts index b1d2c579..79b44010 100644 --- a/modules/module-mysql/src/replication/zongji/zongji-utils.ts +++ b/modules/module-mysql/src/replication/zongji/zongji-utils.ts @@ -6,7 +6,9 @@ import { BinLogTableMapEvent, BinLogRowUpdateEvent, BinLogXidEvent, - BinLogQueryEvent + BinLogQueryEvent, + BinLogHeartbeatEvent, + BinLogHeartbeatEvent_V2 } from '@powersync/mysql-zongji'; export function eventIsGTIDLog(event: BinLogEvent): event is BinLogGTIDLogEvent { @@ -21,6 +23,14 @@ export function eventIsXid(event: BinLogEvent): event is BinLogXidEvent { return event.getEventName() == 'xid'; } +export function eventIsHeartbeat(event: BinLogEvent): event is BinLogHeartbeatEvent { + return event.getEventName() == 'heartbeat'; +} + +export function eventIsHeartbeat_v2(event: BinLogEvent): event is BinLogHeartbeatEvent_V2 { + return event.getEventName() == 'heartbeat_v2'; +} + export function eventIsRotation(event: BinLogEvent): event is BinLogRotationEvent { return event.getEventName() == 'rotate'; } diff --git a/modules/module-mysql/test/src/BinLogListener.test.ts b/modules/module-mysql/test/src/BinLogListener.test.ts index c7447dd7..9fb75fc5 100644 --- a/modules/module-mysql/test/src/BinLogListener.test.ts +++ b/modules/module-mysql/test/src/BinLogListener.test.ts @@ -1,22 +1,15 @@ import { afterAll, beforeAll, beforeEach, describe, expect, test, vi } from 'vitest'; -import { - BinLogEventHandler, - BinLogListener, - Row, - SchemaChange, - SchemaChangeType -} from '@module/replication/zongji/BinLogListener.js'; +import { BinLogListener, SchemaChange, SchemaChangeType } from '@module/replication/zongji/BinLogListener.js'; import { MySQLConnectionManager } from '@module/replication/MySQLConnectionManager.js'; -import { clearTestDb, createTestDb, TEST_CONNECTION_OPTIONS } from './util.js'; -import { v4 as uuid } from 'uuid'; -import * as common from '@module/common/common-index.js'; import { - createRandomServerId, - getMySQLVersion, - qualifiedMySQLTable, - satisfiesVersion -} from '@module/utils/mysql-utils.js'; -import { TableMapEntry } from '@powersync/mysql-zongji'; + clearTestDb, + createBinlogListener, + createTestDb, + TEST_CONNECTION_OPTIONS, + TestBinLogEventHandler +} from './util.js'; +import { v4 as uuid } from 'uuid'; +import { getMySQLVersion, qualifiedMySQLTable, satisfiesVersion } from '@module/utils/mysql-utils.js'; import crypto from 'crypto'; import { TablePattern } from '@powersync/service-sync-rules'; @@ -46,7 +39,11 @@ describe('BinlogListener tests', () => { await connectionManager.query(`CREATE TABLE test_DATA (id CHAR(36) PRIMARY KEY, description MEDIUMTEXT)`); connection.release(); eventHandler = new TestBinLogEventHandler(); - binLogListener = await createBinlogListener(); + binLogListener = await createBinlogListener({ + connectionManager, + sourceTables: [new TablePattern(connectionManager.databaseName, 'test_DATA')], + eventHandler + }); }); afterAll(async () => { @@ -106,6 +103,14 @@ describe('BinlogListener tests', () => { await binLogListener.stop(); }); + test('Keepalive event', async () => { + binLogListener.options.keepAliveInactivitySeconds = 1; + await binLogListener.start(); + await vi.waitFor(() => expect(eventHandler.lastKeepAlive).toBeDefined(), { timeout: 10000 }); + await binLogListener.stop(); + expect(eventHandler.lastKeepAlive).toEqual(binLogListener.options.startGTID.comparable); + }); + test('Schema change event: Rename table', async () => { await binLogListener.start(); await connectionManager.query(`ALTER TABLE test_DATA RENAME test_DATA_new`); @@ -276,7 +281,11 @@ describe('BinlogListener tests', () => { test('Schema change event: Drop and Add primary key', async () => { await connectionManager.query(`CREATE TABLE test_constraints (id CHAR(36), description VARCHAR(100))`); const sourceTables = [new TablePattern(connectionManager.databaseName, 'test_constraints')]; - binLogListener = await createBinlogListener(sourceTables); + binLogListener = await createBinlogListener({ + connectionManager, + eventHandler, + sourceTables + }); await binLogListener.start(); await connectionManager.query(`ALTER TABLE test_constraints ADD PRIMARY KEY (id)`); await connectionManager.query(`ALTER TABLE test_constraints DROP PRIMARY KEY`); @@ -301,7 +310,11 @@ describe('BinlogListener tests', () => { test('Schema change event: Add and drop unique constraint', async () => { await connectionManager.query(`CREATE TABLE test_constraints (id CHAR(36), description VARCHAR(100))`); const sourceTables = [new TablePattern(connectionManager.databaseName, 'test_constraints')]; - binLogListener = await createBinlogListener(sourceTables); + binLogListener = await createBinlogListener({ + connectionManager, + eventHandler, + sourceTables + }); await binLogListener.start(); await connectionManager.query(`ALTER TABLE test_constraints ADD UNIQUE (description)`); await connectionManager.query(`ALTER TABLE test_constraints DROP INDEX description`); @@ -326,7 +339,11 @@ describe('BinlogListener tests', () => { test('Schema change event: Add and drop a unique index', async () => { await connectionManager.query(`CREATE TABLE test_constraints (id CHAR(36), description VARCHAR(100))`); const sourceTables = [new TablePattern(connectionManager.databaseName, 'test_constraints')]; - binLogListener = await createBinlogListener(sourceTables); + binLogListener = await createBinlogListener({ + connectionManager, + eventHandler, + sourceTables + }); await binLogListener.start(); await connectionManager.query(`CREATE UNIQUE INDEX description_idx ON test_constraints (description)`); await connectionManager.query(`DROP INDEX description_idx ON test_constraints`); @@ -367,7 +384,11 @@ describe('BinlogListener tests', () => { // If there are multiple schema changes in the binlog processing queue, we only restart the binlog listener once // all the schema changes have been processed const sourceTables = [new TablePattern(connectionManager.databaseName, 'test_multiple')]; - binLogListener = await createBinlogListener(sourceTables); + binLogListener = await createBinlogListener({ + connectionManager, + eventHandler, + sourceTables + }); await connectionManager.query(`CREATE TABLE test_multiple (id CHAR(36), description VARCHAR(100))`); await connectionManager.query(`ALTER TABLE test_multiple ADD COLUMN new_column VARCHAR(10)`); @@ -388,7 +409,11 @@ describe('BinlogListener tests', () => { test('Unprocessed binlog event received that does match the current table schema', async () => { // If we process a binlog event for a table which has since had its schema changed, we expect the binlog listener to stop with an error const sourceTables = [new TablePattern(connectionManager.databaseName, 'test_failure')]; - binLogListener = await createBinlogListener(sourceTables); + binLogListener = await createBinlogListener({ + connectionManager, + eventHandler, + sourceTables + }); await connectionManager.query(`CREATE TABLE test_failure (id CHAR(36), description VARCHAR(100))`); await connectionManager.query(`INSERT INTO test_failure(id, description) VALUES('${uuid()}','test_failure')`); @@ -403,7 +428,11 @@ describe('BinlogListener tests', () => { test('Unprocessed binlog event received for a dropped table', async () => { const sourceTables = [new TablePattern(connectionManager.databaseName, 'test_failure')]; - binLogListener = await createBinlogListener(sourceTables); + binLogListener = await createBinlogListener({ + connectionManager, + eventHandler, + sourceTables + }); // If we process a binlog event for a table which has since been dropped, we expect the binlog listener to stop with an error await connectionManager.query(`CREATE TABLE test_failure (id CHAR(36), description VARCHAR(100))`); @@ -424,7 +453,11 @@ describe('BinlogListener tests', () => { new TablePattern(connectionManager.databaseName, 'test_DATA'), new TablePattern('multi_schema', 'test_DATA_multi') ]; - binLogListener = await createBinlogListener(sourceTables); + binLogListener = await createBinlogListener({ + connectionManager, + eventHandler, + sourceTables + }); await binLogListener.start(); // Default database insert into test_DATA @@ -439,28 +472,6 @@ describe('BinlogListener tests', () => { assertSchemaChange(eventHandler.schemaChanges[0], SchemaChangeType.DROP_TABLE, 'multi_schema', 'test_DATA_multi'); }); - async function createBinlogListener( - sourceTables?: TablePattern[], - startPosition?: common.BinLogPosition - ): Promise { - if (!sourceTables) { - sourceTables = [new TablePattern(connectionManager.databaseName, 'test_DATA')]; - } - - if (!startPosition) { - const fromGTID = await getFromGTID(connectionManager); - startPosition = fromGTID.position; - } - - return new BinLogListener({ - connectionManager: connectionManager, - eventHandler: eventHandler, - startPosition: startPosition, - sourceTables: sourceTables, - serverId: createRandomServerId(1) - }); - } - function assertSchemaChange( change: SchemaChange, type: SchemaChangeType, @@ -477,14 +488,6 @@ describe('BinlogListener tests', () => { } }); -async function getFromGTID(connectionManager: MySQLConnectionManager) { - const connection = await connectionManager.getConnection(); - const fromGTID = await common.readExecutedGtid(connection); - connection.release(); - - return fromGTID; -} - async function insertRows(connectionManager: MySQLConnectionManager, count: number) { for (let i = 0; i < count; i++) { await connectionManager.query( @@ -500,45 +503,3 @@ async function updateRows(connectionManager: MySQLConnectionManager) { async function deleteRows(connectionManager: MySQLConnectionManager) { await connectionManager.query(`DELETE FROM test_DATA`); } - -class TestBinLogEventHandler implements BinLogEventHandler { - rowsWritten = 0; - rowsUpdated = 0; - rowsDeleted = 0; - commitCount = 0; - schemaChanges: SchemaChange[] = []; - - unpause: ((value: void | PromiseLike) => void) | undefined; - private pausedPromise: Promise | undefined; - - pause() { - this.pausedPromise = new Promise((resolve) => { - this.unpause = resolve; - }); - } - - async onWrite(rows: Row[], tableMap: TableMapEntry) { - if (this.pausedPromise) { - await this.pausedPromise; - } - this.rowsWritten = this.rowsWritten + rows.length; - } - - async onUpdate(afterRows: Row[], beforeRows: Row[], tableMap: TableMapEntry) { - this.rowsUpdated = this.rowsUpdated + afterRows.length; - } - - async onDelete(rows: Row[], tableMap: TableMapEntry) { - this.rowsDeleted = this.rowsDeleted + rows.length; - } - - async onCommit(lsn: string) { - this.commitCount++; - } - - async onSchemaChange(change: SchemaChange) { - this.schemaChanges.push(change); - } - async onTransactionStart(options: { timestamp: Date }) {} - async onRotate() {} -} diff --git a/modules/module-mysql/test/src/schema-changes.test.ts b/modules/module-mysql/test/src/schema-changes.test.ts index 511ca902..99cb25e9 100644 --- a/modules/module-mysql/test/src/schema-changes.test.ts +++ b/modules/module-mysql/test/src/schema-changes.test.ts @@ -652,12 +652,8 @@ function defineTests(factory: storage.TestStorageFactory) { await connectionManager.query(`INSERT INTO ${testTable}(id, description) VALUES('t3','test3')`); await connectionManager.query(`DROP TABLE ${testTable}`); - // Force a commit on the watched schema to advance the checkpoint - await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('t1','test1')`); - const data = await context.getBucketData('global[]'); - // Should only include the entry used to advance the checkpoint - expect(data).toMatchObject([PUT_T1]); + expect(data).toMatchObject([]); }); } diff --git a/modules/module-mysql/test/src/util.ts b/modules/module-mysql/test/src/util.ts index 8f1bab67..cb72b12b 100644 --- a/modules/module-mysql/test/src/util.ts +++ b/modules/module-mysql/test/src/util.ts @@ -1,5 +1,5 @@ import * as types from '@module/types/types.js'; -import { getMySQLVersion, isVersionAtLeast } from '@module/utils/mysql-utils.js'; +import { createRandomServerId, getMySQLVersion, isVersionAtLeast } from '@module/utils/mysql-utils.js'; import * as mongo_storage from '@powersync/service-module-mongodb-storage'; import * as postgres_storage from '@powersync/service-module-postgres-storage'; import mysqlPromise from 'mysql2/promise'; @@ -7,6 +7,10 @@ import { env } from './env.js'; import { describe, TestOptions } from 'vitest'; import { TestStorageFactory } from '@powersync/service-core'; import { MySQLConnectionManager } from '@module/replication/MySQLConnectionManager.js'; +import { BinLogEventHandler, BinLogListener, Row, SchemaChange } from '@module/replication/zongji/BinLogListener.js'; +import { TableMapEntry } from '@powersync/mysql-zongji'; +import * as common from '@module/common/common-index.js'; +import { TablePattern } from '@powersync/service-sync-rules'; export const TEST_URI = env.MYSQL_TEST_URI; @@ -58,3 +62,79 @@ export async function createTestDb(connectionManager: MySQLConnectionManager, db await connectionManager.query(`DROP DATABASE IF EXISTS ${dbName}`); await connectionManager.query(`CREATE DATABASE IF NOT EXISTS ${dbName}`); } + +export async function getFromGTID(connectionManager: MySQLConnectionManager) { + const connection = await connectionManager.getConnection(); + const fromGTID = await common.readExecutedGtid(connection); + connection.release(); + + return fromGTID; +} + +export interface CreateBinlogListenerParams { + connectionManager: MySQLConnectionManager; + eventHandler: BinLogEventHandler; + sourceTables: TablePattern[]; + startGTID?: common.ReplicatedGTID; +} +export async function createBinlogListener(params: CreateBinlogListenerParams): Promise { + let { connectionManager, eventHandler, sourceTables, startGTID } = params; + + if (!startGTID) { + startGTID = await getFromGTID(connectionManager); + } + + return new BinLogListener({ + connectionManager: connectionManager, + eventHandler: eventHandler, + startGTID: startGTID!, + sourceTables: sourceTables, + serverId: createRandomServerId(1) + }); +} + +export class TestBinLogEventHandler implements BinLogEventHandler { + rowsWritten = 0; + rowsUpdated = 0; + rowsDeleted = 0; + commitCount = 0; + schemaChanges: SchemaChange[] = []; + lastKeepAlive: string | undefined; + + unpause: ((value: void | PromiseLike) => void) | undefined; + private pausedPromise: Promise | undefined; + + pause() { + this.pausedPromise = new Promise((resolve) => { + this.unpause = resolve; + }); + } + + async onWrite(rows: Row[], tableMap: TableMapEntry) { + if (this.pausedPromise) { + await this.pausedPromise; + } + this.rowsWritten = this.rowsWritten + rows.length; + } + + async onUpdate(afterRows: Row[], beforeRows: Row[], tableMap: TableMapEntry) { + this.rowsUpdated = this.rowsUpdated + afterRows.length; + } + + async onDelete(rows: Row[], tableMap: TableMapEntry) { + this.rowsDeleted = this.rowsDeleted + rows.length; + } + + async onCommit(lsn: string) { + this.commitCount++; + } + + async onSchemaChange(change: SchemaChange) { + this.schemaChanges.push(change); + } + async onTransactionStart(options: { timestamp: Date }) {} + async onRotate() {} + async onKeepAlive(lsn: string) { + this.lastKeepAlive = lsn; + } +} diff --git a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts index 29b8d4c7..0764496c 100644 --- a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts +++ b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts @@ -378,7 +378,7 @@ export class PostgresBucketBatch } async keepalive(lsn: string): Promise { - if (this.last_checkpoint_lsn != null && lsn <= this.last_checkpoint_lsn) { + if (this.last_checkpoint_lsn != null && lsn < this.last_checkpoint_lsn) { // No-op return false; } diff --git a/packages/service-core/src/api/diagnostics.ts b/packages/service-core/src/api/diagnostics.ts index 82de45b4..8b4fd75e 100644 --- a/packages/service-core/src/api/diagnostics.ts +++ b/packages/service-core/src/api/diagnostics.ts @@ -134,7 +134,7 @@ export async function getSyncRulesStatus( }) ); - if (live_status && status?.active && sourceConfig.type != 'mysql') { + if (live_status && status?.active) { // Check replication lag for active sync rules. // Right now we exclude mysql, since it we don't have consistent keepalives for it. if (sync_rules.last_checkpoint_ts == null && sync_rules.last_keepalive_ts == null) { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index ff286745..aef6017f 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -258,8 +258,8 @@ importers: specifier: workspace:* version: link:../../libs/lib-services '@powersync/mysql-zongji': - specifier: ^0.4.0 - version: 0.4.0 + specifier: ^0.5.0 + version: 0.5.0 '@powersync/service-core': specifier: workspace:* version: link:../../packages/service-core @@ -1290,8 +1290,8 @@ packages: resolution: {integrity: sha512-UA91GwWPhFExt3IizW6bOeY/pQ0BkuNwKjk9iQW9KqxluGCrg4VenZ0/L+2Y0+ZOtme72EVvg6v0zo3AMQRCeA==} engines: {node: '>=12'} - '@powersync/mysql-zongji@0.4.0': - resolution: {integrity: sha512-O5zGYF3mzHO50SOSj3/6EnXYebC2Lvu1BTashbbz6eLAwaR3TkxwMfPGqFEsuecIm22djSBlgXjKM2FzVVG/VQ==} + '@powersync/mysql-zongji@0.5.0': + resolution: {integrity: sha512-wJfZFIsNK8aIyGYvTvz98wxNeo2tv7i05pSkPTEVOeOcsbEF8+CO0vRJpGOLr5or1+BzGDlpUkxJU21JbKFBzA==} engines: {node: '>=22.0.0'} '@prisma/instrumentation@5.16.1': @@ -4762,7 +4762,7 @@ snapshots: '@pnpm/network.ca-file': 1.0.2 config-chain: 1.1.13 - '@powersync/mysql-zongji@0.4.0': + '@powersync/mysql-zongji@0.5.0': dependencies: '@vlasky/mysql': 2.18.6 big-integer: 1.6.52