Skip to content

Commit bf939fa

Browse files
authored
*: don't handle live updates of column size (#58596)
close #58595
1 parent bee268d commit bf939fa

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+148
-510
lines changed

pkg/disttask/importinto/proto.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@ type SharedVars struct {
110110
TableImporter *importer.TableImporter
111111
DataEngine *backend.OpenedEngine
112112
IndexEngine *backend.OpenedEngine
113-
Progress *importer.Progress
114113

115114
mu sync.Mutex
116115
Checksum *verification.KVGroupChecksum
@@ -183,5 +182,4 @@ type Checksum struct {
183182
// This portion of the code may be implemented uniformly in the framework in the future.
184183
type Result struct {
185184
LoadedRowCnt uint64
186-
ColSizeMap map[int64]int64
187185
}

pkg/disttask/importinto/scheduler.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -577,14 +577,9 @@ func updateResult(handle storage.TaskHandle, task *proto.Task, taskMeta *TaskMet
577577
}
578578
subtaskMetas = append(subtaskMetas, &subtaskMeta)
579579
}
580-
columnSizeMap := make(map[int64]int64)
581580
for _, subtaskMeta := range subtaskMetas {
582581
taskMeta.Result.LoadedRowCnt += subtaskMeta.Result.LoadedRowCnt
583-
for key, val := range subtaskMeta.Result.ColSizeMap {
584-
columnSizeMap[key] += val
585-
}
586582
}
587-
taskMeta.Result.ColSizeMap = columnSizeMap
588583

589584
if globalSort {
590585
taskMeta.Result.LoadedRowCnt, err = getLoadedRowCountOnGlobalSort(handle, task)
@@ -662,8 +657,7 @@ func (sch *ImportSchedulerExt) finishJob(ctx context.Context, logger *zap.Logger
662657
func(ctx context.Context) (bool, error) {
663658
return true, taskHandle.WithNewSession(func(se sessionctx.Context) error {
664659
if err := importer.FlushTableStats(ctx, se, taskMeta.Plan.TableInfo.ID, &importer.JobImportResult{
665-
Affected: taskMeta.Result.LoadedRowCnt,
666-
ColSizeMap: taskMeta.Result.ColSizeMap,
660+
Affected: taskMeta.Result.LoadedRowCnt,
667661
}); err != nil {
668662
logger.Warn("flush table stats failed", zap.Error(err))
669663
}

pkg/disttask/importinto/subtask_executor.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ func (e *importMinimalTaskExecutor) Run(ctx context.Context, dataWriter, indexWr
6969
sharedVars.TableImporter,
7070
sharedVars.DataEngine,
7171
sharedVars.IndexEngine,
72-
sharedVars.Progress,
7372
logger,
7473
checksum,
7574
); err != nil {
@@ -82,7 +81,6 @@ func (e *importMinimalTaskExecutor) Run(ctx context.Context, dataWriter, indexWr
8281
sharedVars.TableImporter,
8382
dataWriter,
8483
indexWriter,
85-
sharedVars.Progress,
8684
logger,
8785
checksum,
8886
); err != nil {

pkg/disttask/importinto/task_executor.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,6 @@ func (s *importStepExecutor) RunSubtask(ctx context.Context, subtask *proto.Subt
154154
TableImporter: s.tableImporter,
155155
DataEngine: dataEngine,
156156
IndexEngine: indexEngine,
157-
Progress: importer.NewProgress(),
158157
Checksum: verification.NewKVGroupChecksumWithKeyspace(s.tableImporter.GetKeySpace()),
159158
SortedDataMeta: &external.SortedKVMeta{},
160159
SortedIndexMetas: make(map[int64]*external.SortedKVMeta),
@@ -251,7 +250,6 @@ func (s *importStepExecutor) onFinished(ctx context.Context, subtask *proto.Subt
251250
}
252251
subtaskMeta.Result = Result{
253252
LoadedRowCnt: dataKVCount,
254-
ColSizeMap: sharedVars.Progress.GetColSize(),
255253
}
256254
allocators := sharedVars.TableImporter.Allocators()
257255
subtaskMeta.MaxIDs = map[autoid.AllocatorType]int64{

pkg/executor/delete.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ func (e *DeleteExec) removeRow(ctx sessionctx.Context, t table.Table, h kv.Handl
274274
return err
275275
}
276276

277-
err = t.RemoveRecord(ctx.GetTableCtx(), txn, h, data, posInfo.ExtraPartialRowOption)
277+
err = t.RemoveRecord(ctx.GetTableCtx(), txn, h, data, posInfo.IndexesRowLayout)
278278
if err != nil {
279279
return err
280280
}

pkg/executor/importer/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ go_library(
99
"job.go",
1010
"kv_encode.go",
1111
"precheck.go",
12-
"progress.go",
1312
"table_import.go",
1413
],
1514
importpath = "github.com/pingcap/tidb/pkg/executor/importer",

pkg/executor/importer/engine_process.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ func ProcessChunk(
3030
chunk *checkpoints.ChunkCheckpoint,
3131
tableImporter *TableImporter,
3232
dataEngine, indexEngine *backend.OpenedEngine,
33-
progress *Progress,
3433
logger *zap.Logger,
3534
groupChecksum *verification.KVGroupChecksum,
3635
) error {
@@ -65,7 +64,7 @@ func ProcessChunk(
6564
}
6665
}()
6766

68-
return ProcessChunkWithWriter(ctx, chunk, tableImporter, dataWriter, indexWriter, progress, logger, groupChecksum)
67+
return ProcessChunkWithWriter(ctx, chunk, tableImporter, dataWriter, indexWriter, logger, groupChecksum)
6968
}
7069

7170
// ProcessChunkWithWriter processes a chunk, and write kv pairs to dataWriter and indexWriter.
@@ -74,7 +73,6 @@ func ProcessChunkWithWriter(
7473
chunk *checkpoints.ChunkCheckpoint,
7574
tableImporter *TableImporter,
7675
dataWriter, indexWriter backend.EngineWriter,
77-
progress *Progress,
7876
logger *zap.Logger,
7977
groupChecksum *verification.KVGroupChecksum,
8078
) error {
@@ -116,6 +114,5 @@ func ProcessChunkWithWriter(
116114
if err != nil {
117115
return err
118116
}
119-
progress.AddColSize(encoder.GetColumnSize())
120117
return nil
121118
}

pkg/executor/importer/import.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1395,9 +1395,8 @@ func getDataSourceType(p *plannercore.ImportInto) DataSourceType {
13951395

13961396
// JobImportResult is the result of the job import.
13971397
type JobImportResult struct {
1398-
Affected uint64
1399-
Warnings []contextutil.SQLWarn
1400-
ColSizeMap variable.DeltaColsMap
1398+
Affected uint64
1399+
Warnings []contextutil.SQLWarn
14011400
}
14021401

14031402
// GetMsgFromBRError get msg from BR error.

pkg/executor/importer/importer_testkit_test.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -311,11 +311,9 @@ func TestProcessChunkWith(t *testing.T) {
311311
defer ti.Backend().CloseEngineMgr()
312312
kvWriter := mock.NewMockEngineWriter(ctrl)
313313
kvWriter.EXPECT().AppendRows(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
314-
progress := importer.NewProgress()
315314
checksum := verify.NewKVGroupChecksumWithKeyspace(keyspace)
316-
err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, progress, zap.NewExample(), checksum)
315+
err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, zap.NewExample(), checksum)
317316
require.NoError(t, err)
318-
require.Len(t, progress.GetColSize(), 3)
319317
checksumMap := checksum.GetInnerChecksums()
320318
require.Len(t, checksumMap, 1)
321319
require.Equal(t, verify.MakeKVChecksum(74, 2, 15625182175392723123), *checksumMap[verify.DataKVGroupID])
@@ -343,11 +341,9 @@ func TestProcessChunkWith(t *testing.T) {
343341
ti.SetSelectedRowCh(rowsCh)
344342
kvWriter := mock.NewMockEngineWriter(ctrl)
345343
kvWriter.EXPECT().AppendRows(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
346-
progress := importer.NewProgress()
347344
checksum := verify.NewKVGroupChecksumWithKeyspace(keyspace)
348-
err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, progress, zap.NewExample(), checksum)
345+
err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, zap.NewExample(), checksum)
349346
require.NoError(t, err)
350-
require.Len(t, progress.GetColSize(), 3)
351347
checksumMap := checksum.GetInnerChecksums()
352348
require.Len(t, checksumMap, 1)
353349
require.Equal(t, verify.MakeKVChecksum(111, 3, 14585065391351463171), *checksumMap[verify.DataKVGroupID])

pkg/executor/importer/kv_encode.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@ import (
3535
// KVEncoder encodes a row of data into a KV pair.
3636
type KVEncoder interface {
3737
Encode(row []types.Datum, rowID int64) (*kv.Pairs, error)
38-
// GetColumnSize returns the size of each column in the current encoder.
39-
GetColumnSize() map[int64]int64
4038
io.Closer
4139
}
4240

@@ -91,10 +89,6 @@ func (en *tableKVEncoder) Encode(row []types.Datum, rowID int64) (*kv.Pairs, err
9189
return en.Record2KV(record, row, rowID)
9290
}
9391

94-
func (en *tableKVEncoder) GetColumnSize() map[int64]int64 {
95-
return en.SessionCtx.GetColumnSize(en.TableMeta().ID)
96-
}
97-
9892
// todo merge with code in load_data.go
9993
func (en *tableKVEncoder) parserData2TableData(parserData []types.Datum, rowID int64) ([]types.Datum, error) {
10094
row := make([]types.Datum, 0, len(en.insertColumns))

0 commit comments

Comments
 (0)