diff --git a/components/ee/db-sync/src/export.ts b/components/ee/db-sync/src/export.ts index f1a98ee7943af5..80f97605650e94 100644 --- a/components/ee/db-sync/src/export.ts +++ b/components/ee/db-sync/src/export.ts @@ -12,7 +12,7 @@ import { injectable, multiInject } from "inversify"; import { log } from "@gitpod/gitpod-protocol/lib/util/logging"; export class TableUpdate { - constructor(protected table: TableDescription, protected start?: Date, protected end?: Date) { } + constructor(protected table: TableDescription, protected start?: Date, protected end?: Date) {} protected columns: string[]; protected updateColumns: string[]; protected _updates?: string[]; @@ -29,7 +29,7 @@ export class TableUpdate { public async populate(conn: Connection): Promise { let description: any[]; try { - description = await query(conn, `DESCRIBE ${this.table.name}`) as any[]; + description = (await query(conn, `DESCRIBE ${this.table.name}`)) as any[]; } catch (err) { // TODO(cw): for the time being we might have more tables in the replication list than we actually have // in the database, due to the gitpod-com move. Once we've resolved this situation, missing tables @@ -41,19 +41,23 @@ export class TableUpdate { } // check that timeColumn is a timestamp - const timeField = description.find(f => f.Field == this.table.timeColumn); - if(!timeField) { - throw new Error(`Table ${this.table.name} has no column ${this.table.timeColumn} configured for time-based sync`); - } else if(!(timeField.Type as string).toLowerCase().startsWith("timestamp")) { - throw new Error(`${this.table.name}'s time-column ${this.table.timeColumn} is not a timestamp, but ${timeField.Type}`); + const timeField = description.find((f) => f.Field == this.table.timeColumn); + if (!timeField) { + throw new Error( + `Table ${this.table.name} has no column ${this.table.timeColumn} configured for time-based sync`, + ); + } else if (!(timeField.Type as string).toLowerCase().startsWith("timestamp")) { + throw new Error( + `${this.table.name}'s time-column ${this.table.timeColumn} is not a timestamp, but ${timeField.Type}`, + ); } - this.columns = description.map(f => f.Field as string); + this.columns = description.map((f) => f.Field as string); this.updateColumns = this.columns // do not update primary keys - .filter(f => !this.table.primaryKeys.find(pk => f == pk)) + .filter((f) => !this.table.primaryKeys.find((pk) => f == pk)) // actually ignore the ignore columns - .filter(f => !(this.table.ignoreColumns || []).find(ic => f == ic)); + .filter((f) => !(this.table.ignoreColumns || []).find((ic) => f == ic)); let dataQueryParams: any[] = []; let timeConditions = []; @@ -62,48 +66,56 @@ export class TableUpdate { dataQueryParams.push(this.start); } if (this.end) { - timeConditions.push(`${this.table.timeColumn} <= ?`); + timeConditions.push(`${this.table.timeColumn} < ?`); dataQueryParams.push(this.end); } if (this.table.expiryColumn) { - timeConditions.push(`${this.table.expiryColumn} >= UNIX_TIMESTAMP(UTC_TIMESTAMP())`) + timeConditions.push(`${this.table.expiryColumn} >= UNIX_TIMESTAMP(UTC_TIMESTAMP())`); } let condition = ""; if (timeConditions.length > 0) { condition = `WHERE ${timeConditions.join(" AND ")}`; } - const dataQuery = `SELECT ${this.columns.map(escapeWithBackticks).join(", ")} FROM \`${this.table.name}\` ${condition}`; + const dataQuery = `SELECT ${this.columns.map(escapeWithBackticks).join(", ")} FROM \`${ + this.table.name + }\` ${condition}`; const deletionsAndUpdates = await new Promise((resolve, reject) => { const updates: string[] = []; const deletions: string[] = []; try { conn.query({ sql: dataQuery, values: dataQueryParams }) .stream() - .pipe(new Transform({ - objectMode: true, - transform: (data: any, encoding, cb) => { - if(data[this.table.timeColumn] > Date.now()) { - const pk = this.table.primaryKeys.map(pk => data[pk]).join(", "); - console.warn(`Row (${this.table.name}: ${pk}) was modified in the future. Possible time sync issue between database and db-sync.`); - } - - const deletionStatement = this.getDeletionStatement(data); - if (deletionStatement) { - deletions.push(deletionStatement); - } - - this.getUpdateStatement(data).forEach(s => updates.push(s)); - - cb(); - } - })) - .on('error', (err) => reject(new Error(`Error while exporting ${this.table.name}: ${err}`))) - .on('finish', () => { - console.debug(`Export of ${this.table.name} done: ${deletions.length} deletions, ${updates.length} updates`); - resolve([ deletions, updates ]); + .pipe( + new Transform({ + objectMode: true, + transform: (data: any, encoding, cb) => { + if (data[this.table.timeColumn] > Date.now()) { + const pk = this.table.primaryKeys.map((pk) => data[pk]).join(", "); + console.warn( + `Row (${this.table.name}: ${pk}) was modified in the future. Possible time sync issue between database and db-sync.`, + ); + } + + const deletionStatement = this.getDeletionStatement(data); + if (deletionStatement) { + deletions.push(deletionStatement); + } + + this.getUpdateStatement(data).forEach((s) => updates.push(s)); + + cb(); + }, + }), + ) + .on("error", (err) => reject(new Error(`Error while exporting ${this.table.name}: ${err}`))) + .on("finish", () => { + console.debug( + `Export of ${this.table.name} done: ${deletions.length} deletions, ${updates.length} updates`, + ); + resolve([deletions, updates]); }); - } catch(err) { + } catch (err) { reject(new Error(`Error while exporting ${this.table.name}: ${err}`)); } }); @@ -113,7 +125,7 @@ export class TableUpdate { } protected shouldDelete(row: any): boolean { - if(!this.table.deletionColumn) { + if (!this.table.deletionColumn) { return false; } @@ -121,7 +133,7 @@ export class TableUpdate { } protected getDeletionStatement(row: any): string | undefined { - if(!this.shouldDelete(row)) { + if (!this.shouldDelete(row)) { return; } @@ -134,22 +146,29 @@ export class TableUpdate { return []; } - const pkValues = this.table.primaryKeys.map(c => escape(row[c], true)); - const updateValues = this.updateColumns.map(c => escape(row[c], true)); - const updates = this.updateColumns.map((c, i) => `${escapeWithBackticks(c)}=${updateValues[i]}`).join(", ") + const pkValues = this.table.primaryKeys.map((c) => escape(row[c], true)); + const updateValues = this.updateColumns.map((c) => escape(row[c], true)); + const updates = this.updateColumns.map((c, i) => `${escapeWithBackticks(c)}=${updateValues[i]}`).join(", "); const updateConditions = this.getUpdateConditions(row); - let result = [`INSERT${forceInsert ? '' : ' IGNORE'} INTO ${this.table.name} (${(this.table.primaryKeys.concat(this.updateColumns)).map(escapeWithBackticks).join(", ")}) VALUES (${(pkValues.concat(updateValues)).join(", ")});`]; - if(!forceInsert) { + let result = [ + `INSERT${forceInsert ? "" : " IGNORE"} INTO ${this.table.name} (${this.table.primaryKeys + .concat(this.updateColumns) + .map(escapeWithBackticks) + .join(", ")}) VALUES (${pkValues.concat(updateValues).join(", ")});`, + ]; + if (!forceInsert) { result.push(`UPDATE ${this.table.name} SET ${updates} WHERE ${updateConditions};`); } return result; } protected getUpdateConditions(row: any): string { - return this.table.primaryKeys.map(pk => `${escapeWithBackticks(pk)}=${escape(row[pk])}`).concat([`${this.table.timeColumn}<=${escape(row[this.table.timeColumn])}`]).join(" AND "); + return this.table.primaryKeys + .map((pk) => `${escapeWithBackticks(pk)}=${escape(row[pk])}`) + .concat([`${this.table.timeColumn}<=${escape(row[this.table.timeColumn])}`]) + .join(" AND "); } - } @injectable() @@ -157,32 +176,38 @@ export class TableUpdateProvider { @multiInject(TableDescriptionProvider) protected readonly descriptionProvider: TableDescriptionProvider[]; - public async getAllStatementsForAllTables(conn: Connection, tableSet?: string, start_date?: Date, end_date?: Date): Promise<{ deletions: string[], updates: string[] }> { - const provider = tableSet ? this.descriptionProvider.find(v => v.name == tableSet) : this.descriptionProvider[0]; - if(!provider) { + public async getAllStatementsForAllTables( + conn: Connection, + tableSet?: string, + start_date?: Date, + end_date?: Date, + ): Promise<{ deletions: string[]; updates: string[] }> { + const provider = tableSet + ? this.descriptionProvider.find((v) => v.name == tableSet) + : this.descriptionProvider[0]; + if (!provider) { throw new Error(`Unknown table set ${tableSet} or no table description providers registered`); } - const tableUpdates = provider.getSortedTables().map(t => new TableUpdate(t, start_date, end_date)); - await Promise.all(tableUpdates.map(t => t.populate(conn))); + const tableUpdates = provider.getSortedTables().map((t) => new TableUpdate(t, start_date, end_date)); + await Promise.all(tableUpdates.map((t) => t.populate(conn))); // when collecting the deletions do so in the inverse order as during update (delete dependency targes first) const deletions = []; - for(var stmts of tableUpdates.reverse()) { - for(var stmt of stmts.deletions || []) { + for (var stmts of tableUpdates.reverse()) { + for (var stmt of stmts.deletions || []) { deletions.push(stmt); } } const updates = []; - for(var stmts of tableUpdates) { - for(var stmt of stmts.updates || []) { + for (var stmts of tableUpdates) { + for (var stmt of stmts.updates || []) { updates.push(stmt); } } return { deletions, updates }; } - } function escapeWithBackticks(val: string): string { - return "`" + val + "`" + return "`" + val + "`"; } diff --git a/components/ee/db-sync/src/replication.ts b/components/ee/db-sync/src/replication.ts index 5d10f8e613520a..1e69475ca8a4c9 100644 --- a/components/ee/db-sync/src/replication.ts +++ b/components/ee/db-sync/src/replication.ts @@ -70,39 +70,30 @@ export class PeriodicReplicator { } public async synchronize(ignoreStartDate: boolean): Promise { - const now = new Date(); - let previousRun = await this.getLastExportDate(); - console.info(`Replicating ${this.toString()}: last ran on ${previousRun}`); - if (ignoreStartDate) { - if (previousRun && previousRun > now) { - console.warn( - `Previous run was in the future (${previousRun} > now=${now}). Possible time sync issue between database and db-sync.`, - ); - } - - console.info("Synchronizing complete database (ignoring previous run)"); - previousRun = undefined; - } else if (previousRun && previousRun > now) { - throw new Error( - `Previous run was in the future (${previousRun} > now=${now}). Possible time sync issue between database and db-sync.`, - ); + const period = await this.getSyncPeriod({ ignoreStartDate, offsetFromNowSeconds: 5 }); + if (period === undefined) { + console.info("Cannot find a valid sync period; skipping this time."); + return; } + console.info(`Replicating: syncing period ${SyncPeriod.toString(period)}`); const modifications = await this.tableUpdateProvider.getAllStatementsForAllTables( this.source, this.tableSet, - previousRun, + period.from, + period.to, ); const deletions = modifications.deletions; const updates = modifications.updates; const total = [...deletions, ...updates]; console.debug(`Collected ${total.length} statements`); try { - /* nowait */ this.logStatements(now, total); + /* nowait */ this.logStatements(period.to, total); await Promise.all([this.source, ...this.targets].map((target) => this.update(target, deletions))); await Promise.all(this.targets.map((target) => this.update(target, updates))); - await this.markLastExportDate(now); + + await this.markLastExportDate(period.to); } catch (err) { console.error("Error during replication", err); } @@ -175,6 +166,47 @@ export class PeriodicReplicator { } } + protected async getSyncPeriod(options: { + ignoreStartDate: boolean; + offsetFromNowSeconds: number; + }): Promise { + let previousRun = await this.getLastExportDate(); + if (options.ignoreStartDate) { + console.info("Synchronizing database from beginning of time (ignoring previous run)"); + previousRun = undefined; + } + + const periodEnd = await this.getNextPeriodEnd(options.offsetFromNowSeconds); + if (previousRun && periodEnd.getTime() <= previousRun.getTime()) { + console.warn( + `Period end (now - ${ + options.offsetFromNowSeconds + }s) '${periodEnd.toISOString()}' <= previous run '${previousRun.toISOString()}', no valid SyncPeriod.`, + ); + return undefined; + } + + return { + to: periodEnd, + from: previousRun, + }; + } + + protected async getNextPeriodEnd(offsetSeconds: number): Promise { + // End of syncperiod: now - Xs, DB clock + const rows = (await query( + this.source, + // Why CURRENT_TIMESTAMP(3) you ask? Well, because we want to match the precision of getLastExportDate, which is (indirectly) governed by "toISOString" in "markLastExportDate" below. + "SELECT SUBTIME(CURRENT_TIMESTAMP(3), SEC_TO_TIME(?)) as upperBound", + { values: [offsetSeconds] }, // seconds in the past + )) as { upperBound: string | undefined }[]; + const upperBound = rows[0].upperBound; + if (!upperBound) { + throw new Error("Unable to retrieve next period end: " + JSON.stringify(rows)); + } + return new Date(upperBound); + } + protected async getLastExportDate(): Promise { try { const rows = (await query( @@ -252,3 +284,15 @@ export class PeriodicReplicator { return `${this.source.name} -> [ ${this.targets.map((t) => t.name).join(", ")} ]`; } } + +interface SyncPeriod { + to: Date; + from: Date | undefined; +} + +namespace SyncPeriod { + export function toString(p: SyncPeriod): string { + const from = p.from ? p.from.toISOString() : "---"; + return `${from} -> ${p.to.toISOString()}`; + } +}