@@ -1898,7 +1898,6 @@ func (cf *changeFrontier) manageProtectedTimestamps(
18981898	defer  sp .Finish ()
18991899
19001900	ptsUpdateInterval  :=  changefeedbase .ProtectTimestampInterval .Get (& cf .FlowCtx .Cfg .Settings .SV )
1901- 	ptsUpdateLag  :=  changefeedbase .ProtectTimestampLag .Get (& cf .FlowCtx .Cfg .Settings .SV )
19021901	if  timeutil .Since (cf .lastProtectedTimestampUpdate ) <  ptsUpdateInterval  {
19031902		return  false , nil 
19041903	}
@@ -1915,16 +1914,193 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19151914		}
19161915	}()
19171916
1917+ 	var  ptsEntries  changefeedpb.ProtectedTimestampRecords 
1918+ 	if  err  :=  readChangefeedJobInfo (ctx , perTableProtectedTimestampsFilename , & ptsEntries , txn , cf .spec .JobID ); err  !=  nil  {
1919+ 		return  false , err 
1920+ 	}
1921+ 	pts  :=  cf .FlowCtx .Cfg .ProtectedTimestampProvider .WithTxn (txn )
1922+ 
1923+ 	highwater  :=  func () hlc.Timestamp  {
1924+ 		if  cf .frontier .Frontier ().Less (cf .highWaterAtStart ) {
1925+ 			return  cf .highWaterAtStart 
1926+ 		}
1927+ 		return  cf .frontier .Frontier ()
1928+ 	}()
1929+ 
1930+ 	if  cf .spec .ProgressConfig .PerTableProtectedTimestamps  {
1931+ 		newPTS , updatedPerTablePTS , err  :=  cf .managePerTableProtectedTimestamps (ctx , txn , & ptsEntries , highwater )
1932+ 		if  err  !=  nil  {
1933+ 			return  false , err 
1934+ 		}
1935+ 		updatedMainPTS , err  :=  cf .advanceProtectedTimestamp (ctx , progress , pts , newPTS )
1936+ 		if  err  !=  nil  {
1937+ 			return  false , err 
1938+ 		}
1939+ 		return  updatedMainPTS  ||  updatedPerTablePTS , nil 
1940+ 	}
1941+ 
1942+ 	return  cf .advanceProtectedTimestamp (ctx , progress , pts , highwater )
1943+ }
1944+ 
1945+ func  (cf  * changeFrontier ) managePerTableProtectedTimestamps (
1946+ 	ctx  context.Context ,
1947+ 	txn  isql.Txn ,
1948+ 	ptsEntries  * changefeedpb.ProtectedTimestampRecords ,
1949+ 	highwater  hlc.Timestamp ,
1950+ ) (newPTS  hlc.Timestamp , updatedPerTablePTS  bool , err  error ) {
1951+ 	var  leastLaggingTimestamp  hlc.Timestamp 
1952+ 	for  _ , frontier  :=  range  cf .frontier .Frontiers () {
1953+ 		if  frontier .Frontier ().After (leastLaggingTimestamp ) {
1954+ 			leastLaggingTimestamp  =  frontier .Frontier ()
1955+ 		}
1956+ 	}
1957+ 
1958+ 	newPTS  =  func () hlc.Timestamp  {
1959+ 		lagDuration  :=  changefeedbase .ProtectTimestampBucketingInterval .Get (& cf .FlowCtx .Cfg .Settings .SV )
1960+ 		ptsLagCutoff  :=  leastLaggingTimestamp .AddDuration (- lagDuration )
1961+ 		// If we are within the bucketing interval of having started the changefeed, 
1962+ 		// we use the highwater as the PTS timestamp so as not to try to protect 
1963+ 		// tables before the changefeed started. 
1964+ 		if  ptsLagCutoff .Less (highwater ) {
1965+ 			return  highwater 
1966+ 		}
1967+ 		return  ptsLagCutoff 
1968+ 	}()
1969+ 
19181970	pts  :=  cf .FlowCtx .Cfg .ProtectedTimestampProvider .WithTxn (txn )
1971+ 	tableIDsToRelease  :=  make ([]descpb.ID , 0 )
1972+ 	for  tableID , frontier  :=  range  cf .frontier .Frontiers () {
1973+ 		tableHighWater  :=  func () hlc.Timestamp  {
1974+ 			// If this table has not yet finished its initial scan, we use the highwater 
1975+ 			// which is guaranteed to be at least the changefeed's creation time. 
1976+ 			if  frontier .Frontier ().Less (highwater ) {
1977+ 				return  highwater 
1978+ 			}
1979+ 			return  frontier .Frontier ()
1980+ 		}()
1981+ 
1982+ 		isLagging  :=  tableHighWater .Less (newPTS )
1983+ 
1984+ 		if  cf .knobs .IsTableLagging  !=  nil  &&  cf .knobs .IsTableLagging (tableID ) {
1985+ 			isLagging  =  true 
1986+ 		}
1987+ 
1988+ 		if  ! isLagging  {
1989+ 			if  ptsEntries .ProtectedTimestampRecords [tableID ] !=  nil  {
1990+ 				tableIDsToRelease  =  append (tableIDsToRelease , tableID )
1991+ 			}
1992+ 			continue 
1993+ 		}
19191994
1920- 	// Create / advance the protected timestamp record to the highwater mark 
1921- 	highWater  :=  cf .frontier .Frontier ()
1922- 	if  highWater .Less (cf .highWaterAtStart ) {
1923- 		highWater  =  cf .highWaterAtStart 
1995+ 		if  ptsEntries .ProtectedTimestampRecords [tableID ] !=  nil  {
1996+ 			if  updated , err  :=  cf .advancePerTableProtectedTimestampRecord (ctx , ptsEntries , tableID , tableHighWater , pts ); err  !=  nil  {
1997+ 				return  hlc.Timestamp {}, false , err 
1998+ 			} else  if  updated  {
1999+ 				updatedPerTablePTS  =  true 
2000+ 			}
2001+ 		} else  {
2002+ 			// TODO(#152448): Do not include system table protections in these records. 
2003+ 			if  err  :=  cf .createPerTableProtectedTimestampRecord (ctx , txn , ptsEntries , tableID , tableHighWater , pts ); err  !=  nil  {
2004+ 				return  hlc.Timestamp {}, false , err 
2005+ 			}
2006+ 			updatedPerTablePTS  =  true 
2007+ 		}
2008+ 	}
2009+ 
2010+ 	if  len (tableIDsToRelease ) >  0  {
2011+ 		if  err  :=  cf .releasePerTableProtectedTimestampRecords (ctx , txn , ptsEntries , tableIDsToRelease , pts ); err  !=  nil  {
2012+ 			return  hlc.Timestamp {}, false , err 
2013+ 		}
2014+ 		updatedPerTablePTS  =  true 
19242015	}
2016+ 
2017+ 	return  newPTS , updatedPerTablePTS , nil 
2018+ }
2019+ 
2020+ func  (cf  * changeFrontier ) releasePerTableProtectedTimestampRecords (
2021+ 	ctx  context.Context ,
2022+ 	txn  isql.Txn ,
2023+ 	ptsEntries  * changefeedpb.ProtectedTimestampRecords ,
2024+ 	tableIDs  []descpb.ID ,
2025+ 	pts  protectedts.Storage ,
2026+ ) error  {
2027+ 	for  _ , tableID  :=  range  tableIDs  {
2028+ 		if  err  :=  pts .Release (ctx , * ptsEntries .ProtectedTimestampRecords [tableID ]); err  !=  nil  {
2029+ 			return  err 
2030+ 		}
2031+ 		delete (ptsEntries .ProtectedTimestampRecords , tableID )
2032+ 	}
2033+ 	return  writeChangefeedJobInfo (ctx , perTableProtectedTimestampsFilename , ptsEntries , txn , cf .spec .JobID )
2034+ }
2035+ 
2036+ func  (cf  * changeFrontier ) advancePerTableProtectedTimestampRecord (
2037+ 	ctx  context.Context ,
2038+ 	ptsEntries  * changefeedpb.ProtectedTimestampRecords ,
2039+ 	tableID  descpb.ID ,
2040+ 	tableHighWater  hlc.Timestamp ,
2041+ 	pts  protectedts.Storage ,
2042+ ) (updated  bool , err  error ) {
2043+ 	rec , err  :=  pts .GetRecord (ctx , * ptsEntries .ProtectedTimestampRecords [tableID ])
2044+ 	if  err  !=  nil  {
2045+ 		return  false , err 
2046+ 	}
2047+ 
2048+ 	ptsUpdateLag  :=  changefeedbase .ProtectTimestampLag .Get (& cf .FlowCtx .Cfg .Settings .SV )
2049+ 	if  rec .Timestamp .AddDuration (ptsUpdateLag ).After (tableHighWater ) {
2050+ 		return  false , nil 
2051+ 	}
2052+ 
2053+ 	if  err  :=  pts .UpdateTimestamp (ctx , * ptsEntries .ProtectedTimestampRecords [tableID ], tableHighWater ); err  !=  nil  {
2054+ 		return  false , err 
2055+ 	}
2056+ 	return  true , nil 
2057+ }
2058+ 
2059+ func  (cf  * changeFrontier ) createPerTableProtectedTimestampRecord (
2060+ 	ctx  context.Context ,
2061+ 	txn  isql.Txn ,
2062+ 	ptsEntries  * changefeedpb.ProtectedTimestampRecords ,
2063+ 	tableID  descpb.ID ,
2064+ 	tableHighWater  hlc.Timestamp ,
2065+ 	pts  protectedts.Storage ,
2066+ ) error  {
2067+ 	// If the table is lagging and doesn't have a per table PTS record, 
2068+ 	// we create a new one. 
2069+ 	targets  :=  changefeedbase.Targets {}
2070+ 	if  cf .targets .Size  >  0  {
2071+ 		err  :=  cf .targets .EachTarget (func (target  changefeedbase.Target ) error  {
2072+ 			if  target .DescID  ==  tableID  {
2073+ 				targets .Add (target )
2074+ 			}
2075+ 			return  nil 
2076+ 		})
2077+ 		if  err  !=  nil  {
2078+ 			return  err 
2079+ 		}
2080+ 	}
2081+ 	ptr  :=  createProtectedTimestampRecord (
2082+ 		ctx , cf .FlowCtx .Codec (), cf .spec .JobID , targets , tableHighWater ,
2083+ 	)
2084+ 	if  ptsEntries .ProtectedTimestampRecords  ==  nil  {
2085+ 		ptsEntries .ProtectedTimestampRecords  =  make (map [descpb.ID ]* uuid.UUID )
2086+ 	}
2087+ 	uuid  :=  ptr .ID .GetUUID ()
2088+ 	ptsEntries .ProtectedTimestampRecords [tableID ] =  & uuid 
2089+ 	if  err  :=  pts .Protect (ctx , ptr ); err  !=  nil  {
2090+ 		return  err 
2091+ 	}
2092+ 	return  writeChangefeedJobInfo (ctx , perTableProtectedTimestampsFilename , ptsEntries , txn , cf .spec .JobID )
2093+ }
2094+ 
2095+ func  (cf  * changeFrontier ) advanceProtectedTimestamp (
2096+ 	ctx  context.Context ,
2097+ 	progress  * jobspb.ChangefeedProgress ,
2098+ 	pts  protectedts.Storage ,
2099+ 	timestamp  hlc.Timestamp ,
2100+ ) (updated  bool , err  error ) {
19252101	if  progress .ProtectedTimestampRecord  ==  uuid .Nil  {
19262102		ptr  :=  createProtectedTimestampRecord (
1927- 			ctx , cf .FlowCtx .Codec (), cf .spec .JobID , cf .targets , highWater ,
2103+ 			ctx , cf .FlowCtx .Codec (), cf .spec .JobID , cf .targets , timestamp ,
19282104		)
19292105		progress .ProtectedTimestampRecord  =  ptr .ID .GetUUID ()
19302106		return  true , pts .Protect (ctx , ptr )
@@ -1942,7 +2118,7 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19422118		if  preserveDeprecatedPts  :=  cf .knobs .PreserveDeprecatedPts  !=  nil  &&  cf .knobs .PreserveDeprecatedPts (); preserveDeprecatedPts  {
19432119			return  false , nil 
19442120		}
1945- 		if  err  :=  cf .remakePTSRecord (ctx , pts , progress , highWater ); err  !=  nil  {
2121+ 		if  err  :=  cf .remakePTSRecord (ctx , pts , progress , timestamp ); err  !=  nil  {
19462122			return  false , err 
19472123		}
19482124		return  true , nil 
@@ -1955,26 +2131,28 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19552131		if  preservePTSTargets  :=  cf .knobs .PreservePTSTargets  !=  nil  &&  cf .knobs .PreservePTSTargets (); preservePTSTargets  {
19562132			return  false , nil 
19572133		}
1958- 		if  err  :=  cf .remakePTSRecord (ctx , pts , progress , highWater ); err  !=  nil  {
2134+ 		if  err  :=  cf .remakePTSRecord (ctx , pts , progress , timestamp ); err  !=  nil  {
19592135			return  false , err 
19602136		}
2137+ 		log .VEventf (ctx , 2 , "remade PTS record %v to include all targets" , progress .ProtectedTimestampRecord )
19612138		return  true , nil 
19622139	}
19632140
2141+ 	ptsUpdateLag  :=  changefeedbase .ProtectTimestampLag .Get (& cf .FlowCtx .Cfg .Settings .SV )
19642142	// Only update the PTS timestamp if it is lagging behind the high 
19652143	// watermark. This is to prevent a rush of updates to the PTS if the 
19662144	// changefeed restarts, which can cause contention and second order effects 
19672145	// on system tables. 
1968- 	if  ! rec .Timestamp .AddDuration (ptsUpdateLag ).Less ( highWater ) {
2146+ 	if  rec .Timestamp .AddDuration (ptsUpdateLag ).After ( timestamp ) {
19692147		return  false , nil 
19702148	}
19712149
19722150	if  cf .knobs .ManagePTSError  !=  nil  {
19732151		return  false , cf .knobs .ManagePTSError ()
19742152	}
19752153
1976- 	log .VEventf (ctx , 2 , "updating protected timestamp %v at %v" , progress .ProtectedTimestampRecord , highWater )
1977- 	return  true , pts .UpdateTimestamp (ctx , progress .ProtectedTimestampRecord , highWater )
2154+ 	log .VEventf (ctx , 2 , "updating protected timestamp %v at %v" , progress .ProtectedTimestampRecord , timestamp )
2155+ 	return  true , pts .UpdateTimestamp (ctx , progress .ProtectedTimestampRecord , timestamp )
19782156}
19792157
19802158func  (cf  * changeFrontier ) remakePTSRecord (
0 commit comments