Skip to content

[db-sync] Introduce SyncPeriod (with upper limit now - 5s) #14568

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 81 additions & 56 deletions components/ee/db-sync/src/export.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { injectable, multiInject } from "inversify";
import { log } from "@gitpod/gitpod-protocol/lib/util/logging";

export class TableUpdate {
constructor(protected table: TableDescription, protected start?: Date, protected end?: Date) { }
constructor(protected table: TableDescription, protected start?: Date, protected end?: Date) {}
protected columns: string[];
protected updateColumns: string[];
protected _updates?: string[];
Expand All @@ -29,7 +29,7 @@ export class TableUpdate {
public async populate(conn: Connection): Promise<void> {
let description: any[];
try {
description = await query(conn, `DESCRIBE ${this.table.name}`) as any[];
description = (await query(conn, `DESCRIBE ${this.table.name}`)) as any[];
} catch (err) {
// TODO(cw): for the time being we might have more tables in the replication list than we actually have
// in the database, due to the gitpod-com move. Once we've resolved this situation, missing tables
Expand All @@ -41,19 +41,23 @@ export class TableUpdate {
}

// check that timeColumn is a timestamp
const timeField = description.find(f => f.Field == this.table.timeColumn);
if(!timeField) {
throw new Error(`Table ${this.table.name} has no column ${this.table.timeColumn} configured for time-based sync`);
} else if(!(timeField.Type as string).toLowerCase().startsWith("timestamp")) {
throw new Error(`${this.table.name}'s time-column ${this.table.timeColumn} is not a timestamp, but ${timeField.Type}`);
const timeField = description.find((f) => f.Field == this.table.timeColumn);
if (!timeField) {
throw new Error(
`Table ${this.table.name} has no column ${this.table.timeColumn} configured for time-based sync`,
);
} else if (!(timeField.Type as string).toLowerCase().startsWith("timestamp")) {
throw new Error(
`${this.table.name}'s time-column ${this.table.timeColumn} is not a timestamp, but ${timeField.Type}`,
);
}

this.columns = description.map(f => f.Field as string);
this.columns = description.map((f) => f.Field as string);
this.updateColumns = this.columns
// do not update primary keys
.filter(f => !this.table.primaryKeys.find(pk => f == pk))
.filter((f) => !this.table.primaryKeys.find((pk) => f == pk))
// actually ignore the ignore columns
.filter(f => !(this.table.ignoreColumns || []).find(ic => f == ic));
.filter((f) => !(this.table.ignoreColumns || []).find((ic) => f == ic));

let dataQueryParams: any[] = [];
let timeConditions = [];
Expand All @@ -62,48 +66,56 @@ export class TableUpdate {
dataQueryParams.push(this.start);
}
if (this.end) {
timeConditions.push(`${this.table.timeColumn} <= ?`);
timeConditions.push(`${this.table.timeColumn} < ?`);
dataQueryParams.push(this.end);
}
if (this.table.expiryColumn) {
timeConditions.push(`${this.table.expiryColumn} >= UNIX_TIMESTAMP(UTC_TIMESTAMP())`)
timeConditions.push(`${this.table.expiryColumn} >= UNIX_TIMESTAMP(UTC_TIMESTAMP())`);
}
let condition = "";
if (timeConditions.length > 0) {
condition = `WHERE ${timeConditions.join(" AND ")}`;
}

const dataQuery = `SELECT ${this.columns.map(escapeWithBackticks).join(", ")} FROM \`${this.table.name}\` ${condition}`;
const dataQuery = `SELECT ${this.columns.map(escapeWithBackticks).join(", ")} FROM \`${
this.table.name
}\` ${condition}`;
const deletionsAndUpdates = await new Promise<string[][]>((resolve, reject) => {
const updates: string[] = [];
const deletions: string[] = [];
try {
conn.query({ sql: dataQuery, values: dataQueryParams })
.stream()
.pipe(new Transform({
objectMode: true,
transform: (data: any, encoding, cb) => {
if(data[this.table.timeColumn] > Date.now()) {
const pk = this.table.primaryKeys.map(pk => data[pk]).join(", ");
console.warn(`Row (${this.table.name}: ${pk}) was modified in the future. Possible time sync issue between database and db-sync.`);
}

const deletionStatement = this.getDeletionStatement(data);
if (deletionStatement) {
deletions.push(deletionStatement);
}

this.getUpdateStatement(data).forEach(s => updates.push(s));

cb();
}
}))
.on('error', (err) => reject(new Error(`Error while exporting ${this.table.name}: ${err}`)))
.on('finish', () => {
console.debug(`Export of ${this.table.name} done: ${deletions.length} deletions, ${updates.length} updates`);
resolve([ deletions, updates ]);
.pipe(
new Transform({
objectMode: true,
transform: (data: any, encoding, cb) => {
if (data[this.table.timeColumn] > Date.now()) {
const pk = this.table.primaryKeys.map((pk) => data[pk]).join(", ");
console.warn(
`Row (${this.table.name}: ${pk}) was modified in the future. Possible time sync issue between database and db-sync.`,
);
}

const deletionStatement = this.getDeletionStatement(data);
if (deletionStatement) {
deletions.push(deletionStatement);
}

this.getUpdateStatement(data).forEach((s) => updates.push(s));

cb();
},
}),
)
.on("error", (err) => reject(new Error(`Error while exporting ${this.table.name}: ${err}`)))
.on("finish", () => {
console.debug(
`Export of ${this.table.name} done: ${deletions.length} deletions, ${updates.length} updates`,
);
resolve([deletions, updates]);
});
} catch(err) {
} catch (err) {
reject(new Error(`Error while exporting ${this.table.name}: ${err}`));
}
});
Expand All @@ -113,15 +125,15 @@ export class TableUpdate {
}

protected shouldDelete(row: any): boolean {
if(!this.table.deletionColumn) {
if (!this.table.deletionColumn) {
return false;
}

return !!row[this.table.deletionColumn];
}

protected getDeletionStatement(row: any): string | undefined {
if(!this.shouldDelete(row)) {
if (!this.shouldDelete(row)) {
return;
}

Expand All @@ -134,55 +146,68 @@ export class TableUpdate {
return [];
}

const pkValues = this.table.primaryKeys.map(c => escape(row[c], true));
const updateValues = this.updateColumns.map(c => escape(row[c], true));
const updates = this.updateColumns.map((c, i) => `${escapeWithBackticks(c)}=${updateValues[i]}`).join(", ")
const pkValues = this.table.primaryKeys.map((c) => escape(row[c], true));
const updateValues = this.updateColumns.map((c) => escape(row[c], true));
const updates = this.updateColumns.map((c, i) => `${escapeWithBackticks(c)}=${updateValues[i]}`).join(", ");
const updateConditions = this.getUpdateConditions(row);

let result = [`INSERT${forceInsert ? '' : ' IGNORE'} INTO ${this.table.name} (${(this.table.primaryKeys.concat(this.updateColumns)).map(escapeWithBackticks).join(", ")}) VALUES (${(pkValues.concat(updateValues)).join(", ")});`];
if(!forceInsert) {
let result = [
`INSERT${forceInsert ? "" : " IGNORE"} INTO ${this.table.name} (${this.table.primaryKeys
.concat(this.updateColumns)
.map(escapeWithBackticks)
.join(", ")}) VALUES (${pkValues.concat(updateValues).join(", ")});`,
];
if (!forceInsert) {
result.push(`UPDATE ${this.table.name} SET ${updates} WHERE ${updateConditions};`);
}
return result;
}

protected getUpdateConditions(row: any): string {
return this.table.primaryKeys.map(pk => `${escapeWithBackticks(pk)}=${escape(row[pk])}`).concat([`${this.table.timeColumn}<=${escape(row[this.table.timeColumn])}`]).join(" AND ");
return this.table.primaryKeys
.map((pk) => `${escapeWithBackticks(pk)}=${escape(row[pk])}`)
.concat([`${this.table.timeColumn}<=${escape(row[this.table.timeColumn])}`])
.join(" AND ");
}

}

@injectable()
export class TableUpdateProvider {
@multiInject(TableDescriptionProvider)
protected readonly descriptionProvider: TableDescriptionProvider[];

public async getAllStatementsForAllTables(conn: Connection, tableSet?: string, start_date?: Date, end_date?: Date): Promise<{ deletions: string[], updates: string[] }> {
const provider = tableSet ? this.descriptionProvider.find(v => v.name == tableSet) : this.descriptionProvider[0];
if(!provider) {
public async getAllStatementsForAllTables(
conn: Connection,
tableSet?: string,
start_date?: Date,
end_date?: Date,
): Promise<{ deletions: string[]; updates: string[] }> {
const provider = tableSet
? this.descriptionProvider.find((v) => v.name == tableSet)
: this.descriptionProvider[0];
if (!provider) {
throw new Error(`Unknown table set ${tableSet} or no table description providers registered`);
}
const tableUpdates = provider.getSortedTables().map(t => new TableUpdate(t, start_date, end_date));
await Promise.all(tableUpdates.map(t => t.populate(conn)));
const tableUpdates = provider.getSortedTables().map((t) => new TableUpdate(t, start_date, end_date));
await Promise.all(tableUpdates.map((t) => t.populate(conn)));

// when collecting the deletions do so in the inverse order as during update (delete dependency targes first)
const deletions = [];
for(var stmts of tableUpdates.reverse()) {
for(var stmt of stmts.deletions || []) {
for (var stmts of tableUpdates.reverse()) {
for (var stmt of stmts.deletions || []) {
deletions.push(stmt);
}
}
const updates = [];
for(var stmts of tableUpdates) {
for(var stmt of stmts.updates || []) {
for (var stmts of tableUpdates) {
for (var stmt of stmts.updates || []) {
updates.push(stmt);
}
}
return { deletions, updates };
}

}

function escapeWithBackticks(val: string): string {
return "`" + val + "`"
return "`" + val + "`";
}
82 changes: 63 additions & 19 deletions components/ee/db-sync/src/replication.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,39 +70,30 @@ export class PeriodicReplicator {
}

public async synchronize(ignoreStartDate: boolean): Promise<void> {
const now = new Date();
let previousRun = await this.getLastExportDate();
console.info(`Replicating ${this.toString()}: last ran on ${previousRun}`);
if (ignoreStartDate) {
if (previousRun && previousRun > now) {
console.warn(
`Previous run was in the future (${previousRun} > now=${now}). Possible time sync issue between database and db-sync.`,
);
}

console.info("Synchronizing complete database (ignoring previous run)");
previousRun = undefined;
} else if (previousRun && previousRun > now) {
throw new Error(
`Previous run was in the future (${previousRun} > now=${now}). Possible time sync issue between database and db-sync.`,
);
const period = await this.getSyncPeriod({ ignoreStartDate, offsetFromNowSeconds: 5 });
if (period === undefined) {
console.info("Cannot find a valid sync period; skipping this time.");
return;
}
console.info(`Replicating: syncing period ${SyncPeriod.toString(period)}`);

const modifications = await this.tableUpdateProvider.getAllStatementsForAllTables(
this.source,
this.tableSet,
previousRun,
period.from,
period.to,
);
const deletions = modifications.deletions;
const updates = modifications.updates;
const total = [...deletions, ...updates];
console.debug(`Collected ${total.length} statements`);
try {
/* nowait */ this.logStatements(now, total);
/* nowait */ this.logStatements(period.to, total);

await Promise.all([this.source, ...this.targets].map((target) => this.update(target, deletions)));
await Promise.all(this.targets.map((target) => this.update(target, updates)));
await this.markLastExportDate(now);

await this.markLastExportDate(period.to);
} catch (err) {
console.error("Error during replication", err);
}
Expand Down Expand Up @@ -175,6 +166,47 @@ export class PeriodicReplicator {
}
}

protected async getSyncPeriod(options: {
ignoreStartDate: boolean;
offsetFromNowSeconds: number;
}): Promise<SyncPeriod | undefined> {
let previousRun = await this.getLastExportDate();
if (options.ignoreStartDate) {
console.info("Synchronizing database from beginning of time (ignoring previous run)");
previousRun = undefined;
}

const periodEnd = await this.getNextPeriodEnd(options.offsetFromNowSeconds);
if (previousRun && periodEnd.getTime() <= previousRun.getTime()) {
console.warn(
`Period end (now - ${
options.offsetFromNowSeconds
}s) '${periodEnd.toISOString()}' <= previous run '${previousRun.toISOString()}', no valid SyncPeriod.`,
);
return undefined;
}

return {
to: periodEnd,
from: previousRun,
};
}

protected async getNextPeriodEnd(offsetSeconds: number): Promise<Date> {
// End of syncperiod: now - Xs, DB clock
const rows = (await query(
this.source,
// Why CURRENT_TIMESTAMP(3) you ask? Well, because we want to match the precision of getLastExportDate, which is (indirectly) governed by "toISOString" in "markLastExportDate" below.
"SELECT SUBTIME(CURRENT_TIMESTAMP(3), SEC_TO_TIME(?)) as upperBound",
{ values: [offsetSeconds] }, // seconds in the past
)) as { upperBound: string | undefined }[];
const upperBound = rows[0].upperBound;
if (!upperBound) {
throw new Error("Unable to retrieve next period end: " + JSON.stringify(rows));
}
return new Date(upperBound);
}

protected async getLastExportDate(): Promise<Date | undefined> {
try {
const rows = (await query(
Expand Down Expand Up @@ -252,3 +284,15 @@ export class PeriodicReplicator {
return `${this.source.name} -> [ ${this.targets.map((t) => t.name).join(", ")} ]`;
}
}

interface SyncPeriod {
to: Date;
from: Date | undefined;
}

namespace SyncPeriod {
export function toString(p: SyncPeriod): string {
const from = p.from ? p.from.toISOString() : "---";
return `${from} -> ${p.to.toISOString()}`;
}
}