Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions plugins/elastic/bulk_indexing/bulk_indexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ package bulk_indexing

import (
"fmt"
"infini.sh/framework/core/locker"
"runtime"
"strings"
"sync"
"time"

"infini.sh/framework/core/locker"

"github.com/OneOfOne/xxhash"
log "github.com/cihub/seelog"

Expand Down Expand Up @@ -96,8 +97,7 @@ type Config struct {
MaxWorkers int `config:"max_worker_size"`

DoubleCheckOffsetBeforeBulk bool `config:"double_check_offset_before_bulk"`

DetectActiveQueue bool `config:"detect_active_queue"`
DetectActiveQueue bool `config:"detect_active_queue"`

VerboseBulkResult bool `config:"verbose_bulk_result"`

Expand Down Expand Up @@ -811,7 +811,6 @@ READ_DOCS:
log.Errorf("slice worker, worker:[%v], error on consume queue:[%v], slice_id:%v, no data fetched, offset: %v, err: %v", workerID, qConfig.Name, sliceID, ctx1, err)
}
goto CLEAN_BUFFER
return
}

log.Errorf("slice worker, worker:[%v], error on queue:[%v], slice_id:%v, %v", workerID, qConfig.Name, sliceID, err)
Expand Down Expand Up @@ -950,25 +949,36 @@ READ_DOCS:
if offset != nil && committedOffset != nil && !offset.Equals(*committedOffset) {
err := consumerInstance.CommitOffset(*offset)
if err != nil {
log.Errorf("🔧 offset commit failed, worker:[%v], queue:[%v], slice:[%v], offset:[%v], err:%v", workerID, qConfig.Name, sliceID, *offset, err)
panic(err)
}

if global.Env().IsDebug {
log.Tracef("slice worker, worker:[%v], [%v][%v][%v][%v] success commit offset:%v,ctx:%v,timeout:%v,err:%v", workerID, qConfig.Name, consumerConfig.Group, consumerConfig.Name, sliceID, *offset, committedOffset, ctx1.String(), timeout, err)
}
// fix: update committedOffset immediately after successful commit, to ensure state consistency
committedOffset = offset
log.Debugf("🔧 offset committed successfully, worker:[%v], queue:[%v], slice:[%v], offset:[%v]", workerID, qConfig.Name, sliceID, *offset)
} else {
log.Error("offset not committed:", offset, ",moved to:", &pop.NextOffset)
if global.Env().IsDebug {
log.Debugf("🔧 offset not changed, skip commit, worker:[%v], queue:[%v], slice:[%v], offset:[%v], committed:[%v]", workerID, qConfig.Name, sliceID, offset, committedOffset)
}
}
offset = &pop.NextOffset
// fix: this code is moved to loop outside (line 970) to avoid updating offset in the middle of bulk submission
// offset = &pop.NextOffset
}
} else {
log.Errorf("should not submit this bulk request, worker[%v], queue:[%v], slice:[%v], offset:[%v]->[%v],%v, msg:%v", workerID, qConfig.ID, sliceID, committedOffset, offset, err, msgCount)
}
}

// fix: update offset after each message is processed, to ensure progress sync with actual processing
// so even if it crashes before submission, it will not repeat processing messages written to the buffer after restart
offset = &pop.NextOffset
}

offset = &ctx1.NextOffset
// fix: remove this code to avoid overwriting the updated offset in the loop
// offset = &ctx1.NextOffset
}

if time.Since(lastCommit) > idleDuration && mainBuf.GetMessageSize() > 0 {
Expand Down Expand Up @@ -1174,7 +1184,6 @@ func (processor *BulkIndexingProcessor) getElasticsearchMetadata(qConfig *queue.
if esConfig == nil {
if processor.config.ElasticsearchConfig != nil {
processor.config.ElasticsearchConfig.Source = "bulk_indexing"
esConfig = processor.config.ElasticsearchConfig
}
esConfig = processor.config.ElasticsearchConfig
}
Expand Down
Loading