Skip to content

Commit e895705

Browse files
committed
[db-sync] Cap upper limit of sync periodto now - 5s and use DB clock for this
1 parent cc233e4 commit e895705

File tree

1 file changed

+62
-19
lines changed

1 file changed

+62
-19
lines changed

components/ee/db-sync/src/replication.ts

Lines changed: 62 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -70,39 +70,32 @@ export class PeriodicReplicator {
7070
}
7171

7272
public async synchronize(ignoreStartDate: boolean): Promise<void> {
73-
const now = new Date();
74-
let previousRun = await this.getLastExportDate();
75-
console.info(`Replicating ${this.toString()}: last ran on ${previousRun}`);
76-
if (ignoreStartDate) {
77-
if (previousRun && previousRun > now) {
78-
console.warn(
79-
`Previous run was in the future (${previousRun} > now=${now}). Possible time sync issue between database and db-sync.`,
80-
);
81-
}
82-
83-
console.info("Synchronizing complete database (ignoring previous run)");
84-
previousRun = undefined;
85-
} else if (previousRun && previousRun > now) {
86-
throw new Error(
87-
`Previous run was in the future (${previousRun} > now=${now}). Possible time sync issue between database and db-sync.`,
88-
);
73+
let period: SyncPeriod;
74+
try {
75+
period = await this.getSyncPeriod({ ignoreStartDate, offsetFromNowSeconds: 5 });
76+
} catch (err) {
77+
console.info("Cannot find a valid sync period; skipping this time.", err);
78+
return;
8979
}
80+
console.info(`Replicating: syncing period ${SyncPeriod.toString(period)}`);
9081

9182
const modifications = await this.tableUpdateProvider.getAllStatementsForAllTables(
9283
this.source,
9384
this.tableSet,
94-
previousRun,
85+
period.from,
86+
period.to,
9587
);
9688
const deletions = modifications.deletions;
9789
const updates = modifications.updates;
9890
const total = [...deletions, ...updates];
9991
console.debug(`Collected ${total.length} statements`);
10092
try {
101-
/* nowait */ this.logStatements(now, total);
93+
/* nowait */ this.logStatements(period.to, total);
10294

10395
await Promise.all([this.source, ...this.targets].map((target) => this.update(target, deletions)));
10496
await Promise.all(this.targets.map((target) => this.update(target, updates)));
105-
await this.markLastExportDate(now);
97+
98+
await this.markLastExportDate(period.to);
10699
} catch (err) {
107100
console.error("Error during replication", err);
108101
}
@@ -175,6 +168,44 @@ export class PeriodicReplicator {
175168
}
176169
}
177170

171+
protected async getSyncPeriod(options: {
172+
ignoreStartDate: boolean;
173+
offsetFromNowSeconds: number;
174+
}): Promise<SyncPeriod> {
175+
let previousRun = await this.getLastExportDate();
176+
if (options.ignoreStartDate) {
177+
console.info("Synchronizing database from beginning of time (ignoring previous run)");
178+
previousRun = undefined;
179+
}
180+
181+
const periodEnd = await this.getNextPeriodEnd(options.offsetFromNowSeconds);
182+
if (previousRun && periodEnd.getTime() <= previousRun.getTime()) {
183+
throw new Error(
184+
`Cannot find a period [${previousRun.toISOString()}, now - ${options.offsetFromNowSeconds}s)]`,
185+
);
186+
}
187+
188+
return {
189+
to: periodEnd,
190+
from: previousRun,
191+
};
192+
}
193+
194+
protected async getNextPeriodEnd(offsetSeconds: number): Promise<Date> {
195+
// End of syncperiod: now - Xs, DB clock
196+
const rows = (await query(
197+
this.source,
198+
// Why CURRENT_TIMESTAMP(3) you ask? Well, because we want to match the precision of getLastExportDate, which is (indirectly) governed by "toISOString" in "markLastExportDate" below.
199+
"SELECT SUBTIME(CURRENT_TIMESTAMP(3), SEC_TO_TIME(?)) as upperBound",
200+
{ values: [offsetSeconds] }, // seconds in the past
201+
)) as { upperBound: string | undefined }[];
202+
const upperBound = rows[0].upperBound;
203+
if (!upperBound) {
204+
throw new Error("Unable to retrieve next period end: " + JSON.stringify(rows));
205+
}
206+
return new Date(upperBound);
207+
}
208+
178209
protected async getLastExportDate(): Promise<Date | undefined> {
179210
try {
180211
const rows = (await query(
@@ -252,3 +283,15 @@ export class PeriodicReplicator {
252283
return `${this.source.name} -> [ ${this.targets.map((t) => t.name).join(", ")} ]`;
253284
}
254285
}
286+
287+
interface SyncPeriod {
288+
to: Date;
289+
from: Date | undefined;
290+
}
291+
292+
namespace SyncPeriod {
293+
export function toString(p: SyncPeriod): string {
294+
const from = p.from ? p.from.toISOString() : "---";
295+
return `${from} -> ${p.to.toISOString()}`;
296+
}
297+
}

0 commit comments

Comments
 (0)