@@ -1898,7 +1898,6 @@ func (cf *changeFrontier) manageProtectedTimestamps(
18981898 defer sp .Finish ()
18991899
19001900 ptsUpdateInterval := changefeedbase .ProtectTimestampInterval .Get (& cf .FlowCtx .Cfg .Settings .SV )
1901- ptsUpdateLag := changefeedbase .ProtectTimestampLag .Get (& cf .FlowCtx .Cfg .Settings .SV )
19021901 if timeutil .Since (cf .lastProtectedTimestampUpdate ) < ptsUpdateInterval {
19031902 return false , nil
19041903 }
@@ -1915,16 +1914,193 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19151914 }
19161915 }()
19171916
1917+ var ptsEntries changefeedpb.ProtectedTimestampRecords
1918+ if err := readChangefeedJobInfo (ctx , perTableProtectedTimestampsFilename , & ptsEntries , txn , cf .spec .JobID ); err != nil {
1919+ return false , err
1920+ }
1921+ pts := cf .FlowCtx .Cfg .ProtectedTimestampProvider .WithTxn (txn )
1922+
1923+ highwater := func () hlc.Timestamp {
1924+ if cf .frontier .Frontier ().Less (cf .highWaterAtStart ) {
1925+ return cf .highWaterAtStart
1926+ }
1927+ return cf .frontier .Frontier ()
1928+ }()
1929+
1930+ if cf .spec .ProgressConfig .PerTableProtectedTimestamps {
1931+ newPTS , updatedPerTablePTS , err := cf .managePerTableProtectedTimestamps (ctx , txn , & ptsEntries , highwater )
1932+ if err != nil {
1933+ return false , err
1934+ }
1935+ updatedMainPTS , err := cf .advanceProtectedTimestamp (ctx , progress , pts , newPTS )
1936+ if err != nil {
1937+ return false , err
1938+ }
1939+ return updatedMainPTS || updatedPerTablePTS , nil
1940+ }
1941+
1942+ return cf .advanceProtectedTimestamp (ctx , progress , pts , highwater )
1943+ }
1944+
1945+ func (cf * changeFrontier ) managePerTableProtectedTimestamps (
1946+ ctx context.Context ,
1947+ txn isql.Txn ,
1948+ ptsEntries * changefeedpb.ProtectedTimestampRecords ,
1949+ highwater hlc.Timestamp ,
1950+ ) (newPTS hlc.Timestamp , updatedPerTablePTS bool , err error ) {
1951+ var leastLaggingTimestamp hlc.Timestamp
1952+ for _ , frontier := range cf .frontier .Frontiers () {
1953+ if frontier .Frontier ().After (leastLaggingTimestamp ) {
1954+ leastLaggingTimestamp = frontier .Frontier ()
1955+ }
1956+ }
1957+
1958+ newPTS = func () hlc.Timestamp {
1959+ lagDuration := changefeedbase .ProtectTimestampBucketingInterval .Get (& cf .FlowCtx .Cfg .Settings .SV )
1960+ ptsLagCutoff := leastLaggingTimestamp .AddDuration (- lagDuration )
1961+ // If we are within the bucketing interval of having started the changefeed,
1962+ // we use the highwater as the PTS timestamp so as not to try to protect
1963+ // tables before the changefeed started.
1964+ if ptsLagCutoff .Less (highwater ) {
1965+ return highwater
1966+ }
1967+ return ptsLagCutoff
1968+ }()
1969+
19181970 pts := cf .FlowCtx .Cfg .ProtectedTimestampProvider .WithTxn (txn )
1971+ tableIDsToRelease := make ([]descpb.ID , 0 )
1972+ for tableID , frontier := range cf .frontier .Frontiers () {
1973+ tableHighWater := func () hlc.Timestamp {
1974+ // If this table has not yet finished its initial scan, we use the highwater
1975+ // which is guaranteed to be at least the changefeed's creation time.
1976+ if frontier .Frontier ().Less (highwater ) {
1977+ return highwater
1978+ }
1979+ return frontier .Frontier ()
1980+ }()
1981+
1982+ isLagging := tableHighWater .Less (newPTS )
1983+
1984+ if cf .knobs .IsTableLagging != nil && cf .knobs .IsTableLagging (tableID ) {
1985+ isLagging = true
1986+ }
1987+
1988+ if ! isLagging {
1989+ if ptsEntries .ProtectedTimestampRecords [tableID ] != nil {
1990+ tableIDsToRelease = append (tableIDsToRelease , tableID )
1991+ }
1992+ continue
1993+ }
19191994
1920- // Create / advance the protected timestamp record to the highwater mark
1921- highWater := cf .frontier .Frontier ()
1922- if highWater .Less (cf .highWaterAtStart ) {
1923- highWater = cf .highWaterAtStart
1995+ if ptsEntries .ProtectedTimestampRecords [tableID ] != nil {
1996+ if updated , err := cf .advancePerTableProtectedTimestampRecord (ctx , ptsEntries , tableID , tableHighWater , pts ); err != nil {
1997+ return hlc.Timestamp {}, false , err
1998+ } else if updated {
1999+ updatedPerTablePTS = true
2000+ }
2001+ } else {
2002+ // TODO(#152448): Do not include system table protections in these records.
2003+ if err := cf .createPerTableProtectedTimestampRecord (ctx , txn , ptsEntries , tableID , tableHighWater , pts ); err != nil {
2004+ return hlc.Timestamp {}, false , err
2005+ }
2006+ updatedPerTablePTS = true
2007+ }
2008+ }
2009+
2010+ if len (tableIDsToRelease ) > 0 {
2011+ if err := cf .releasePerTableProtectedTimestampRecords (ctx , txn , ptsEntries , tableIDsToRelease , pts ); err != nil {
2012+ return hlc.Timestamp {}, false , err
2013+ }
2014+ updatedPerTablePTS = true
19242015 }
2016+
2017+ return newPTS , updatedPerTablePTS , nil
2018+ }
2019+
2020+ func (cf * changeFrontier ) releasePerTableProtectedTimestampRecords (
2021+ ctx context.Context ,
2022+ txn isql.Txn ,
2023+ ptsEntries * changefeedpb.ProtectedTimestampRecords ,
2024+ tableIDs []descpb.ID ,
2025+ pts protectedts.Storage ,
2026+ ) error {
2027+ for _ , tableID := range tableIDs {
2028+ if err := pts .Release (ctx , * ptsEntries .ProtectedTimestampRecords [tableID ]); err != nil {
2029+ return err
2030+ }
2031+ delete (ptsEntries .ProtectedTimestampRecords , tableID )
2032+ }
2033+ return writeChangefeedJobInfo (ctx , perTableProtectedTimestampsFilename , ptsEntries , txn , cf .spec .JobID )
2034+ }
2035+
2036+ func (cf * changeFrontier ) advancePerTableProtectedTimestampRecord (
2037+ ctx context.Context ,
2038+ ptsEntries * changefeedpb.ProtectedTimestampRecords ,
2039+ tableID descpb.ID ,
2040+ tableHighWater hlc.Timestamp ,
2041+ pts protectedts.Storage ,
2042+ ) (updated bool , err error ) {
2043+ rec , err := pts .GetRecord (ctx , * ptsEntries .ProtectedTimestampRecords [tableID ])
2044+ if err != nil {
2045+ return false , err
2046+ }
2047+
2048+ ptsUpdateLag := changefeedbase .ProtectTimestampLag .Get (& cf .FlowCtx .Cfg .Settings .SV )
2049+ if rec .Timestamp .AddDuration (ptsUpdateLag ).After (tableHighWater ) {
2050+ return false , nil
2051+ }
2052+
2053+ if err := pts .UpdateTimestamp (ctx , * ptsEntries .ProtectedTimestampRecords [tableID ], tableHighWater ); err != nil {
2054+ return false , err
2055+ }
2056+ return true , nil
2057+ }
2058+
2059+ func (cf * changeFrontier ) createPerTableProtectedTimestampRecord (
2060+ ctx context.Context ,
2061+ txn isql.Txn ,
2062+ ptsEntries * changefeedpb.ProtectedTimestampRecords ,
2063+ tableID descpb.ID ,
2064+ tableHighWater hlc.Timestamp ,
2065+ pts protectedts.Storage ,
2066+ ) error {
2067+ // If the table is lagging and doesn't have a per table PTS record,
2068+ // we create a new one.
2069+ targets := changefeedbase.Targets {}
2070+ if cf .targets .Size > 0 {
2071+ err := cf .targets .EachTarget (func (target changefeedbase.Target ) error {
2072+ if target .DescID == tableID {
2073+ targets .Add (target )
2074+ }
2075+ return nil
2076+ })
2077+ if err != nil {
2078+ return err
2079+ }
2080+ }
2081+ ptr := createProtectedTimestampRecord (
2082+ ctx , cf .FlowCtx .Codec (), cf .spec .JobID , targets , tableHighWater ,
2083+ )
2084+ if ptsEntries .ProtectedTimestampRecords == nil {
2085+ ptsEntries .ProtectedTimestampRecords = make (map [descpb.ID ]* uuid.UUID )
2086+ }
2087+ uuid := ptr .ID .GetUUID ()
2088+ ptsEntries .ProtectedTimestampRecords [tableID ] = & uuid
2089+ if err := pts .Protect (ctx , ptr ); err != nil {
2090+ return err
2091+ }
2092+ return writeChangefeedJobInfo (ctx , perTableProtectedTimestampsFilename , ptsEntries , txn , cf .spec .JobID )
2093+ }
2094+
2095+ func (cf * changeFrontier ) advanceProtectedTimestamp (
2096+ ctx context.Context ,
2097+ progress * jobspb.ChangefeedProgress ,
2098+ pts protectedts.Storage ,
2099+ timestamp hlc.Timestamp ,
2100+ ) (updated bool , err error ) {
19252101 if progress .ProtectedTimestampRecord == uuid .Nil {
19262102 ptr := createProtectedTimestampRecord (
1927- ctx , cf .FlowCtx .Codec (), cf .spec .JobID , cf .targets , highWater ,
2103+ ctx , cf .FlowCtx .Codec (), cf .spec .JobID , cf .targets , timestamp ,
19282104 )
19292105 progress .ProtectedTimestampRecord = ptr .ID .GetUUID ()
19302106 return true , pts .Protect (ctx , ptr )
@@ -1942,7 +2118,7 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19422118 if preserveDeprecatedPts := cf .knobs .PreserveDeprecatedPts != nil && cf .knobs .PreserveDeprecatedPts (); preserveDeprecatedPts {
19432119 return false , nil
19442120 }
1945- if err := cf .remakePTSRecord (ctx , pts , progress , highWater ); err != nil {
2121+ if err := cf .remakePTSRecord (ctx , pts , progress , timestamp ); err != nil {
19462122 return false , err
19472123 }
19482124 return true , nil
@@ -1955,26 +2131,28 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19552131 if preservePTSTargets := cf .knobs .PreservePTSTargets != nil && cf .knobs .PreservePTSTargets (); preservePTSTargets {
19562132 return false , nil
19572133 }
1958- if err := cf .remakePTSRecord (ctx , pts , progress , highWater ); err != nil {
2134+ if err := cf .remakePTSRecord (ctx , pts , progress , timestamp ); err != nil {
19592135 return false , err
19602136 }
2137+ log .VEventf (ctx , 2 , "remade PTS record %v to include all targets" , progress .ProtectedTimestampRecord )
19612138 return true , nil
19622139 }
19632140
2141+ ptsUpdateLag := changefeedbase .ProtectTimestampLag .Get (& cf .FlowCtx .Cfg .Settings .SV )
19642142 // Only update the PTS timestamp if it is lagging behind the high
19652143 // watermark. This is to prevent a rush of updates to the PTS if the
19662144 // changefeed restarts, which can cause contention and second order effects
19672145 // on system tables.
1968- if ! rec .Timestamp .AddDuration (ptsUpdateLag ).Less ( highWater ) {
2146+ if rec .Timestamp .AddDuration (ptsUpdateLag ).After ( timestamp ) {
19692147 return false , nil
19702148 }
19712149
19722150 if cf .knobs .ManagePTSError != nil {
19732151 return false , cf .knobs .ManagePTSError ()
19742152 }
19752153
1976- log .VEventf (ctx , 2 , "updating protected timestamp %v at %v" , progress .ProtectedTimestampRecord , highWater )
1977- return true , pts .UpdateTimestamp (ctx , progress .ProtectedTimestampRecord , highWater )
2154+ log .VEventf (ctx , 2 , "updating protected timestamp %v at %v" , progress .ProtectedTimestampRecord , timestamp )
2155+ return true , pts .UpdateTimestamp (ctx , progress .ProtectedTimestampRecord , timestamp )
19782156}
19792157
19802158func (cf * changeFrontier ) remakePTSRecord (
0 commit comments