Skip to content

Commit c63ae2f

Browse files
committed
add resolved tables to specs
1 parent f4a53b3 commit c63ae2f

File tree

2 files changed

+17
-1
lines changed

2 files changed

+17
-1
lines changed

pkg/ccl/changefeedccl/changefeed_dist.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
1515
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
16+
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedpb"
1617
"github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvfollowerreadsccl"
1718
"github.com/cockroachdb/cockroach/pkg/clusterversion"
1819
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
@@ -470,6 +471,20 @@ func makePlan(
470471
}
471472
}
472473

474+
var resolvedTables *changefeedpb.ResolvedTables
475+
if progressConfig != nil && progressConfig.PerTableTracking {
476+
var rt changefeedpb.ResolvedTables
477+
if err := execCtx.ExecCfg().InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
478+
return readChangefeedJobInfo(ctx, resolvedTablesFilename, &rt, txn, jobID)
479+
}); err != nil {
480+
return nil, nil, err
481+
}
482+
resolvedTables = &rt
483+
if log.ExpensiveLogEnabled(ctx, 2) {
484+
log.Dev.Infof(ctx, "read resolved tables: %v", resolvedTables)
485+
}
486+
}
487+
473488
aggregatorSpecs := make([]*execinfrapb.ChangeAggregatorSpec, len(spanPartitions))
474489
for i, sp := range spanPartitions {
475490
if log.ExpensiveLogEnabled(ctx, 2) {
@@ -493,6 +508,7 @@ func makePlan(
493508
Select: execinfrapb.Expression{Expr: details.Select},
494509
Description: description,
495510
ProgressConfig: progressConfig,
511+
ResolvedTables: resolvedTables,
496512
}
497513
}
498514

@@ -508,6 +524,7 @@ func makePlan(
508524
UserProto: execCtx.User().EncodeProto(),
509525
Description: description,
510526
ProgressConfig: progressConfig,
527+
ResolvedTables: resolvedTables,
511528
}
512529

513530
if haveKnobs && maybeCfKnobs.OnDistflowSpec != nil {

pkg/ccl/changefeedccl/changefeed_job_info.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ func writeChangefeedJobInfo(
4949

5050
// readChangefeedJobInfo reads a changefeed job info protobuf from the
5151
// job_info table. A changefeed-specific filename is required.
52-
// TODO(#148119): Use this function to read.
5352
func readChangefeedJobInfo(
5453
ctx context.Context, filename string, info protoutil.Message, txn isql.Txn, jobID jobspb.JobID,
5554
) error {

0 commit comments

Comments
 (0)