Skip to content

Commit 6c50fa0

Browse files
committed
Rework handling of running transactions by extensions.
* RunningTransactionsExtension structure as a part of xl_running_xacts and RunningTransactionsData to be filled by extensions. * getRunningTransactionsExtension hook to fill RunningTransactionsExtension. * Add nextXid field to CSNSnapshotData. * SnapBuildGetCSNSnaphot() function to modify CSNSnapshotData in the SnapBuild. * waitSnapshotHook hook to wait for transactions inside the snapshot builder.
1 parent a54a771 commit 6c50fa0

File tree

8 files changed

+69
-19
lines changed

8 files changed

+69
-19
lines changed

src/backend/replication/logical/snapbuild.c

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1280,8 +1280,9 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
12801280
ReorderBufferTXN *txn;
12811281
TransactionId xmin;
12821282

1283-
builder->csnSnapshotData.snapshotcsn = running->csn;
1284-
builder->csnSnapshotData.xmin = 0;
1283+
builder->csnSnapshotData.snapshotcsn = running->extension.csn;
1284+
builder->csnSnapshotData.xmin = running->extension.runXmin;
1285+
builder->csnSnapshotData.nextXid = running->extension.nextXid;
12851286
builder->csnSnapshotData.xlogptr = lsn;
12861287

12871288
/*
@@ -1311,6 +1312,10 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
13111312
* we hit fast paths in heapam_visibility.c.
13121313
*/
13131314
builder->xmin = running->oldestRunningXid;
1315+
builder->csnSnapshotData.snapshotcsn = running->extension.csn;
1316+
builder->csnSnapshotData.xmin = running->extension.runXmin;
1317+
builder->csnSnapshotData.nextXid = running->extension.nextXid;
1318+
builder->csnSnapshotData.xlogptr = lsn;
13141319

13151320
/* Remove transactions we don't need to keep track off anymore */
13161321
SnapBuildPurgeOlderTxn(builder);
@@ -1439,7 +1444,8 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
14391444
* NB: We might have already started to incrementally assemble a snapshot,
14401445
* so we need to be careful to deal with that.
14411446
*/
1442-
if (running->oldestRunningXid == running->nextXid)
1447+
if (running->oldestRunningXid == running->nextXid &&
1448+
running->extension.runXmin == running->extension.nextXid)
14431449
{
14441450
if (builder->start_decoding_at == InvalidXLogRecPtr ||
14451451
builder->start_decoding_at <= lsn)
@@ -1568,6 +1574,11 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
15681574
return true;
15691575
}
15701576

1577+
/*
1578+
* Hook for custom waits in SnapBuildWaitSnapshot() provided by extensions.
1579+
*/
1580+
WaitSnapshotHookType waitSnapshotHook = NULL;
1581+
15711582
/* ---
15721583
* Iterate through xids in record, wait for all older than the cutoff to
15731584
* finish. Then, if possible, log a new xl_running_xacts record.
@@ -1602,6 +1613,12 @@ SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
16021613
XactLockTableWait(xid, NULL, NULL, XLTW_None);
16031614
}
16041615

1616+
/*
1617+
* Give extensions chance for their custom waits.
1618+
*/
1619+
if (waitSnapshotHook)
1620+
waitSnapshotHook(&running->extension);
1621+
16051622
/*
16061623
* All transactions we needed to finish finished - try to ensure there is
16071624
* another xl_running_xacts record in a timely manner, without having to
@@ -2209,9 +2226,8 @@ CheckPointSnapBuild(void)
22092226
FreeDir(snap_dir);
22102227
}
22112228

2212-
void
2213-
SnapBuildUpdateCSNSnaphot(SnapBuild *builder,
2214-
CSNSnapshotData *csnSnapshotData)
2229+
CSNSnapshotData *
2230+
SnapBuildGetCSNSnaphot(SnapBuild *builder)
22152231
{
2216-
builder->csnSnapshotData = *csnSnapshotData;
2232+
return &builder->csnSnapshotData;
22172233
}

src/backend/storage/ipc/procarray.c

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2699,6 +2699,11 @@ ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc)
26992699
return result;
27002700
}
27012701

2702+
/*
2703+
* A hook for filling RunningTransactionsExtension structure by extensions.
2704+
*/
2705+
GetRunningTransactionsExtensionHookType getRunningTransactionsExtension = NULL;
2706+
27022707
/*
27032708
* GetRunningTransactionData -- returns information about running transactions.
27042709
*
@@ -2880,7 +2885,20 @@ GetRunningTransactionData(void)
28802885
CurrentRunningXacts->nextXid = XidFromFullTransactionId(ShmemVariableCache->nextXid);
28812886
CurrentRunningXacts->oldestRunningXid = oldestRunningXid;
28822887
CurrentRunningXacts->latestCompletedXid = latestCompletedXid;
2883-
CurrentRunningXacts->csn = pg_atomic_read_u64(&ShmemVariableCache->nextCommitSeqNo);
2888+
2889+
/*
2890+
* Give extensions chance to fill their structs.
2891+
*/
2892+
if (getRunningTransactionsExtension)
2893+
{
2894+
getRunningTransactionsExtension(&CurrentRunningXacts->extension);
2895+
}
2896+
else
2897+
{
2898+
CurrentRunningXacts->extension.csn = 0;
2899+
CurrentRunningXacts->extension.nextXid = 0;
2900+
CurrentRunningXacts->extension.runXmin = 0;
2901+
}
28842902

28852903
Assert(TransactionIdIsValid(CurrentRunningXacts->nextXid));
28862904
Assert(TransactionIdIsValid(CurrentRunningXacts->oldestRunningXid));

src/backend/storage/ipc/standby.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1355,7 +1355,7 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
13551355
xlrec.nextXid = CurrRunningXacts->nextXid;
13561356
xlrec.oldestRunningXid = CurrRunningXacts->oldestRunningXid;
13571357
xlrec.latestCompletedXid = CurrRunningXacts->latestCompletedXid;
1358-
xlrec.csn = CurrRunningXacts->csn;
1358+
xlrec.extension = CurrRunningXacts->extension;
13591359

13601360
/* Header */
13611361
XLogBeginInsert();

src/backend/utils/time/snapmgr.c

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2194,9 +2194,7 @@ SerializeSnapshot(Snapshot snapshot, char *start_address)
21942194
serialized_snapshot.curcid = snapshot->curcid;
21952195
serialized_snapshot.whenTaken = snapshot->whenTaken;
21962196
serialized_snapshot.lsn = snapshot->lsn;
2197-
serialized_snapshot.csnSnapshotData.xmin = snapshot->csnSnapshotData.xmin;
2198-
serialized_snapshot.csnSnapshotData.snapshotcsn = snapshot->csnSnapshotData.snapshotcsn;
2199-
serialized_snapshot.csnSnapshotData.xlogptr = snapshot->csnSnapshotData.xlogptr;
2197+
serialized_snapshot.csnSnapshotData = snapshot->csnSnapshotData;
22002198
serialized_snapshot.undoRegularRowLocation = snapshot->undoRegularRowLocationPhNode.undoLocation;
22012199
serialized_snapshot.undoRegularPageLocation = snapshot->undoRegularPageLocationPhNode.undoLocation;
22022200
serialized_snapshot.undoSystemLocation = snapshot->undoSystemLocationPhNode.undoLocation;
@@ -2275,9 +2273,7 @@ RestoreSnapshot(char *start_address)
22752273
snapshot->whenTaken = serialized_snapshot.whenTaken;
22762274
snapshot->lsn = serialized_snapshot.lsn;
22772275
snapshot->snapXactCompletionCount = 0;
2278-
snapshot->csnSnapshotData.xmin = serialized_snapshot.csnSnapshotData.xmin;
2279-
snapshot->csnSnapshotData.snapshotcsn = serialized_snapshot.csnSnapshotData.snapshotcsn;
2280-
snapshot->csnSnapshotData.xlogptr = serialized_snapshot.csnSnapshotData.xlogptr;
2276+
snapshot->csnSnapshotData = serialized_snapshot.csnSnapshotData;
22812277
snapshot->undoRegularRowLocationPhNode.undoLocation = serialized_snapshot.undoRegularRowLocation;
22822278
snapshot->undoRegularPageLocationPhNode.undoLocation = serialized_snapshot.undoRegularPageLocation;
22832279
snapshot->undoSystemLocationPhNode.undoLocation = serialized_snapshot.undoSystemLocation;

src/include/replication/snapbuild.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#define SNAPBUILD_H
1414

1515
#include "access/xlogdefs.h"
16+
#include "storage/standbydefs.h"
1617
#include "utils/snapmgr.h"
1718

1819
typedef enum
@@ -57,6 +58,10 @@ struct ReorderBuffer;
5758
struct xl_heap_new_cid;
5859
struct xl_running_xacts;
5960

61+
typedef void (*WaitSnapshotHookType) (RunningTransactionsExtension *extension);
62+
63+
extern PGDLLIMPORT WaitSnapshotHookType waitSnapshotHook;
64+
6065
extern void CheckPointSnapBuild(void);
6166

6267
extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *reorder,
@@ -91,7 +96,6 @@ extern void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
9196
extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn,
9297
struct xl_running_xacts *running);
9398
extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn);
94-
extern void SnapBuildUpdateCSNSnaphot(SnapBuild *builder,
95-
CSNSnapshotData *csnSnapshotData);
99+
extern CSNSnapshotData *SnapBuildGetCSNSnaphot(SnapBuild *builder);
96100

97101
#endif /* SNAPBUILD_H */

src/include/storage/standby.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,17 @@ typedef struct RunningTransactionsData
9191
TransactionId nextXid; /* xid from ShmemVariableCache->nextXid */
9292
TransactionId oldestRunningXid; /* *not* oldestXmin */
9393
TransactionId latestCompletedXid; /* so we can set xmax */
94-
CommitSeqNo csn; /* current csn */
94+
RunningTransactionsExtension extension;
9595

9696
TransactionId *xids; /* array of (sub)xids still running */
9797
} RunningTransactionsData;
9898

9999
typedef RunningTransactionsData *RunningTransactions;
100100

101+
typedef void (*GetRunningTransactionsExtensionHookType) (RunningTransactionsExtension *extension);
102+
103+
extern PGDLLIMPORT GetRunningTransactionsExtensionHookType getRunningTransactionsExtension;
104+
101105
extern void LogAccessExclusiveLock(Oid dbOid, Oid relOid);
102106
extern void LogAccessExclusiveLockPrepare(void);
103107

src/include/storage/standbydefs.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,17 @@ typedef struct xl_standby_locks
4141
xl_standby_lock locks[FLEXIBLE_ARRAY_MEMBER];
4242
} xl_standby_locks;
4343

44+
/*
45+
* A part of xl_running_xacts and RunningTransactionsData to be filled by
46+
* extensions.
47+
*/
48+
typedef struct
49+
{
50+
uint64 nextXid;
51+
uint64 runXmin;
52+
CommitSeqNo csn; /* current csn */
53+
} RunningTransactionsExtension;
54+
4455
/*
4556
* When we write running xact data to WAL, we use this structure.
4657
*/
@@ -52,7 +63,7 @@ typedef struct xl_running_xacts
5263
TransactionId nextXid; /* xid from ShmemVariableCache->nextXid */
5364
TransactionId oldestRunningXid; /* *not* oldestXmin */
5465
TransactionId latestCompletedXid; /* so we can set xmax */
55-
CommitSeqNo csn; /* current csn */
66+
RunningTransactionsExtension extension;
5667

5768
TransactionId xids[FLEXIBLE_ARRAY_MEMBER];
5869
} xl_running_xacts;

src/include/utils/snapshot.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ typedef struct
131131
typedef struct CSNSnapshotData
132132
{
133133
uint64 xmin;
134+
uint64 nextXid;
134135
CommitSeqNo snapshotcsn;
135136
XLogRecPtr xlogptr;
136137
} CSNSnapshotData;

0 commit comments

Comments
 (0)