@@ -1897,8 +1897,8 @@ func (cf *changeFrontier) manageProtectedTimestamps(
18971897 ctx , sp := tracing .ChildSpan (ctx , "changefeed.frontier.manage_protected_timestamps" )
18981898 defer sp .Finish ()
18991899
1900+ usePerTablePTS := cf .spec .ProgressConfig .PerTableProtectedTimestamps
19001901 ptsUpdateInterval := changefeedbase .ProtectTimestampInterval .Get (& cf .FlowCtx .Cfg .Settings .SV )
1901- ptsUpdateLag := changefeedbase .ProtectTimestampLag .Get (& cf .FlowCtx .Cfg .Settings .SV )
19021902 if timeutil .Since (cf .lastProtectedTimestampUpdate ) < ptsUpdateInterval {
19031903 return false , nil
19041904 }
@@ -1915,16 +1915,193 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19151915 }
19161916 }()
19171917
1918+ var ptsEntries changefeedpb.ProtectedTimestampRecords
1919+ if err := readChangefeedJobInfo (ctx , perTableProtectedTimestampsFilename , & ptsEntries , txn , cf .spec .JobID ); err != nil {
1920+ return false , err
1921+ }
1922+ pts := cf .FlowCtx .Cfg .ProtectedTimestampProvider .WithTxn (txn )
1923+
1924+ highwater := func () hlc.Timestamp {
1925+ if cf .frontier .Frontier ().Less (cf .highWaterAtStart ) {
1926+ return cf .highWaterAtStart
1927+ }
1928+ return cf .frontier .Frontier ()
1929+ }()
1930+
1931+ if usePerTablePTS {
1932+ newPtsTimestamp , updatedPerTablePts , err := cf .managePerTableProtectedTimestamps (ctx , txn , & ptsEntries , highwater )
1933+ if err != nil {
1934+ return false , err
1935+ }
1936+ updatedMainPts , err := cf .advanceProtectedTimestamp (ctx , txn , progress , pts , newPtsTimestamp )
1937+ if err != nil {
1938+ return false , err
1939+ }
1940+ return updatedMainPts || updatedPerTablePts , nil
1941+ }
1942+
1943+ return cf .advanceProtectedTimestamp (ctx , txn , progress , pts , highwater )
1944+ }
1945+
1946+ func (cf * changeFrontier ) managePerTableProtectedTimestamps (
1947+ ctx context.Context ,
1948+ txn isql.Txn ,
1949+ ptsEntries * changefeedpb.ProtectedTimestampRecords ,
1950+ highwater hlc.Timestamp ,
1951+ ) (newPtsTimestamp hlc.Timestamp , updatedPerTablePts bool , err error ) {
1952+ var leastLaggingTimestamp hlc.Timestamp
1953+ for _ , frontier := range cf .frontier .Frontiers () {
1954+ if frontier .Frontier ().After (leastLaggingTimestamp ) {
1955+ leastLaggingTimestamp = frontier .Frontier ()
1956+ }
1957+ }
1958+
1959+ newPtsTimestamp = func () hlc.Timestamp {
1960+ lagDuration := changefeedbase .ProtectTimestampBucketingInterval .Get (& cf .FlowCtx .Cfg .Settings .SV )
1961+ // If we are within the bucketing interval of having started the changefeed,
1962+ // we use the highwater as the PTS timestamp so as not to try to protect
1963+ // tables before the changefeed started.
1964+ if leastLaggingTimestamp .AddDuration (- lagDuration ).Less (highwater ) {
1965+ return highwater
1966+ }
1967+ return leastLaggingTimestamp .AddDuration (- lagDuration )
1968+ }()
1969+
19181970 pts := cf .FlowCtx .Cfg .ProtectedTimestampProvider .WithTxn (txn )
1971+ tableIDsToRelease := make ([]descpb.ID , 0 )
1972+ for tableID , frontier := range cf .frontier .Frontiers () {
1973+ tableHighWater := func () hlc.Timestamp {
1974+ // If this table has not yet finished its initial scan, we use the highwater
1975+ // which is guaranteed to be at least the changefeed's creation time.
1976+ if frontier .Frontier ().Less (highwater ) {
1977+ return highwater
1978+ }
1979+ return frontier .Frontier ()
1980+ }()
1981+
1982+ isLagging := tableHighWater .Less (newPtsTimestamp )
1983+
1984+ if cf .knobs .IsTableLagging != nil && cf .knobs .IsTableLagging (tableID ) {
1985+ isLagging = true
1986+ }
1987+
1988+ if ! isLagging {
1989+ if ptsEntries .ProtectedTimestampRecords [tableID ] != nil {
1990+ tableIDsToRelease = append (tableIDsToRelease , tableID )
1991+ }
1992+ continue
1993+ }
19191994
1920- // Create / advance the protected timestamp record to the highwater mark
1921- highWater := cf .frontier .Frontier ()
1922- if highWater .Less (cf .highWaterAtStart ) {
1923- highWater = cf .highWaterAtStart
1995+ if ptsEntries .ProtectedTimestampRecords [tableID ] != nil {
1996+ if updated , err := cf .advancePerTableProtectedTimestampRecord (ctx , ptsEntries , tableID , tableHighWater , pts ); err != nil {
1997+ return hlc.Timestamp {}, false , err
1998+ } else if updated {
1999+ updatedPerTablePts = true
2000+ }
2001+ } else {
2002+ // TODO(#152448): Do not include system table protections in these records.
2003+ if err := cf .createPerTableProtectedTimestampRecord (ctx , txn , ptsEntries , tableID , tableHighWater , pts ); err != nil {
2004+ return hlc.Timestamp {}, false , err
2005+ }
2006+ updatedPerTablePts = true
2007+ }
2008+ }
2009+
2010+ if len (tableIDsToRelease ) > 0 {
2011+ if err := cf .releasePerTableProtectedTimestampRecords (ctx , txn , ptsEntries , tableIDsToRelease , pts ); err != nil {
2012+ return hlc.Timestamp {}, false , err
2013+ }
2014+ updatedPerTablePts = true
19242015 }
2016+
2017+ return newPtsTimestamp , updatedPerTablePts , nil
2018+ }
2019+
2020+ func (cf * changeFrontier ) releasePerTableProtectedTimestampRecords (
2021+ ctx context.Context ,
2022+ txn isql.Txn ,
2023+ ptsEntries * changefeedpb.ProtectedTimestampRecords ,
2024+ tableIDs []descpb.ID ,
2025+ pts protectedts.Storage ,
2026+ ) error {
2027+ for _ , tableID := range tableIDs {
2028+ if err := pts .Release (ctx , * ptsEntries .ProtectedTimestampRecords [tableID ]); err != nil {
2029+ return err
2030+ }
2031+ delete (ptsEntries .ProtectedTimestampRecords , tableID )
2032+ }
2033+ return writeChangefeedJobInfo (ctx , perTableProtectedTimestampsFilename , ptsEntries , txn , cf .spec .JobID )
2034+ }
2035+
2036+ func (cf * changeFrontier ) advancePerTableProtectedTimestampRecord (
2037+ ctx context.Context ,
2038+ ptsEntries * changefeedpb.ProtectedTimestampRecords ,
2039+ tableID descpb.ID ,
2040+ tableHighWater hlc.Timestamp ,
2041+ pts protectedts.Storage ,
2042+ ) (updated bool , err error ) {
2043+ rec , err := pts .GetRecord (ctx , * ptsEntries .ProtectedTimestampRecords [tableID ])
2044+ if err != nil {
2045+ return false , err
2046+ }
2047+
2048+ ptsUpdateLag := changefeedbase .ProtectTimestampLag .Get (& cf .FlowCtx .Cfg .Settings .SV )
2049+ if rec .Timestamp .AddDuration (ptsUpdateLag ).After (tableHighWater ) {
2050+ return false , nil
2051+ }
2052+
2053+ if err := pts .UpdateTimestamp (ctx , * ptsEntries .ProtectedTimestampRecords [tableID ], tableHighWater ); err != nil {
2054+ return false , err
2055+ }
2056+ return true , nil
2057+ }
2058+
2059+ func (cf * changeFrontier ) createPerTableProtectedTimestampRecord (
2060+ ctx context.Context ,
2061+ txn isql.Txn ,
2062+ ptsEntries * changefeedpb.ProtectedTimestampRecords ,
2063+ tableID descpb.ID ,
2064+ tableHighWater hlc.Timestamp ,
2065+ pts protectedts.Storage ,
2066+ ) error {
2067+ // If the table is lagging and doesn't have a per table PTS record,
2068+ // we create a new one.
2069+ targets := changefeedbase.Targets {}
2070+ if cf .targets .Size > 0 {
2071+ err := cf .targets .EachTarget (func (target changefeedbase.Target ) error {
2072+ if target .DescID == tableID {
2073+ targets .Add (target )
2074+ }
2075+ return nil
2076+ })
2077+ if err != nil {
2078+ return err
2079+ }
2080+ }
2081+ ptr := createProtectedTimestampRecord (
2082+ ctx , cf .FlowCtx .Codec (), cf .spec .JobID , targets , tableHighWater ,
2083+ )
2084+ if ptsEntries .ProtectedTimestampRecords == nil {
2085+ ptsEntries .ProtectedTimestampRecords = make (map [descpb.ID ]* uuid.UUID )
2086+ }
2087+ uuid := ptr .ID .GetUUID ()
2088+ ptsEntries .ProtectedTimestampRecords [tableID ] = & uuid
2089+ if err := pts .Protect (ctx , ptr ); err != nil {
2090+ return err
2091+ }
2092+ return writeChangefeedJobInfo (ctx , perTableProtectedTimestampsFilename , ptsEntries , txn , cf .spec .JobID )
2093+ }
2094+
2095+ func (cf * changeFrontier ) advanceProtectedTimestamp (
2096+ ctx context.Context ,
2097+ txn isql.Txn ,
2098+ progress * jobspb.ChangefeedProgress ,
2099+ pts protectedts.Storage ,
2100+ timestamp hlc.Timestamp ,
2101+ ) (updated bool , err error ) {
19252102 if progress .ProtectedTimestampRecord == uuid .Nil {
19262103 ptr := createProtectedTimestampRecord (
1927- ctx , cf .FlowCtx .Codec (), cf .spec .JobID , cf .targets , highWater ,
2104+ ctx , cf .FlowCtx .Codec (), cf .spec .JobID , cf .targets , timestamp ,
19282105 )
19292106 progress .ProtectedTimestampRecord = ptr .ID .GetUUID ()
19302107 return true , pts .Protect (ctx , ptr )
@@ -1942,7 +2119,7 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19422119 if preserveDeprecatedPts := cf .knobs .PreserveDeprecatedPts != nil && cf .knobs .PreserveDeprecatedPts (); preserveDeprecatedPts {
19432120 return false , nil
19442121 }
1945- if err := cf .remakePTSRecord (ctx , pts , progress , highWater ); err != nil {
2122+ if err := cf .remakePTSRecord (ctx , pts , progress , timestamp ); err != nil {
19462123 return false , err
19472124 }
19482125 return true , nil
@@ -1955,26 +2132,28 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19552132 if preservePTSTargets := cf .knobs .PreservePTSTargets != nil && cf .knobs .PreservePTSTargets (); preservePTSTargets {
19562133 return false , nil
19572134 }
1958- if err := cf .remakePTSRecord (ctx , pts , progress , highWater ); err != nil {
2135+ if err := cf .remakePTSRecord (ctx , pts , progress , timestamp ); err != nil {
19592136 return false , err
19602137 }
2138+ log .VEventf (ctx , 2 , "remade PTS record %v to include all targets" , progress .ProtectedTimestampRecord )
19612139 return true , nil
19622140 }
19632141
2142+ ptsUpdateLag := changefeedbase .ProtectTimestampLag .Get (& cf .FlowCtx .Cfg .Settings .SV )
19642143 // Only update the PTS timestamp if it is lagging behind the high
19652144 // watermark. This is to prevent a rush of updates to the PTS if the
19662145 // changefeed restarts, which can cause contention and second order effects
19672146 // on system tables.
1968- if ! rec .Timestamp .AddDuration (ptsUpdateLag ).Less ( highWater ) {
2147+ if rec .Timestamp .AddDuration (ptsUpdateLag ).After ( timestamp ) {
19692148 return false , nil
19702149 }
19712150
19722151 if cf .knobs .ManagePTSError != nil {
19732152 return false , cf .knobs .ManagePTSError ()
19742153 }
19752154
1976- log .VEventf (ctx , 2 , "updating protected timestamp %v at %v" , progress .ProtectedTimestampRecord , highWater )
1977- return true , pts .UpdateTimestamp (ctx , progress .ProtectedTimestampRecord , highWater )
2155+ log .VEventf (ctx , 2 , "updating protected timestamp %v at %v" , progress .ProtectedTimestampRecord , timestamp )
2156+ return true , pts .UpdateTimestamp (ctx , progress .ProtectedTimestampRecord , timestamp )
19782157}
19792158
19802159func (cf * changeFrontier ) remakePTSRecord (
0 commit comments