From de3f8c2b1f181a77c30ad49ecce5b5fa72a63722 Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Sun, 18 Jul 2021 15:42:08 +0800 Subject: [PATCH 1/3] Fix data race in bleve indexer --- modules/indexer/bleve/batch.go | 56 +++++++++++++++++++++++++++++++++ modules/indexer/code/bleve.go | 10 +++--- modules/indexer/issues/bleve.go | 6 ++-- 3 files changed, 66 insertions(+), 6 deletions(-) create mode 100644 modules/indexer/bleve/batch.go diff --git a/modules/indexer/bleve/batch.go b/modules/indexer/bleve/batch.go new file mode 100644 index 0000000000000..32f69010ffd15 --- /dev/null +++ b/modules/indexer/bleve/batch.go @@ -0,0 +1,56 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package bleve + +import ( + "github.com/blevesearch/bleve/v2" +) + +// FlushingBatch is a batch of operations that automatically flushes to the +// underlying index once it reaches a certain size. +type FlushingBatch struct { + maxBatchSize int + batch *bleve.Batch + index bleve.Index +} + +// NewFlushingBatch creates a new flushing batch for the specified index. Once +// the number of operations in the batch reaches the specified limit, the batch +// automatically flushes its operations to the index. +func NewFlushingBatch(index bleve.Index, maxBatchSize int) *FlushingBatch { + return &FlushingBatch{ + maxBatchSize: maxBatchSize, + batch: index.NewBatch(), + index: index, + } +} + +func (b *FlushingBatch) Index(id string, data interface{}) error { + if err := b.batch.Index(id, data); err != nil { + return err + } + return b.flushIfFull() +} + +func (b *FlushingBatch) Delete(id string) error { + b.batch.Delete(id) + return b.flushIfFull() +} + +func (b *FlushingBatch) flushIfFull() error { + if b.batch.Size() < b.maxBatchSize { + return nil + } + return b.Flush() +} + +func (b *FlushingBatch) Flush() error { + err := b.index.Batch(b.batch) + if err != nil { + return err + } + b.batch = b.index.NewBatch() + return nil +} diff --git a/modules/indexer/code/bleve.go b/modules/indexer/code/bleve.go index 600789a284096..fc5c602dbef08 100644 --- a/modules/indexer/code/bleve.go +++ b/modules/indexer/code/bleve.go @@ -18,6 +18,7 @@ import ( "code.gitea.io/gitea/modules/analyze" "code.gitea.io/gitea/modules/charset" "code.gitea.io/gitea/modules/git" + gitea_bleve "code.gitea.io/gitea/modules/indexer/bleve" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/timeutil" @@ -176,7 +177,8 @@ func NewBleveIndexer(indexDir string) (*BleveIndexer, bool, error) { return indexer, created, err } -func (b *BleveIndexer) addUpdate(batchWriter git.WriteCloserError, batchReader *bufio.Reader, commitSha string, update fileUpdate, repo *models.Repository, batch rupture.FlushingBatch) error { +func (b *BleveIndexer) addUpdate(batchWriter git.WriteCloserError, batchReader *bufio.Reader, commitSha string, + update fileUpdate, repo *models.Repository, batch *gitea_bleve.FlushingBatch) error { // Ignore vendored files in code search if setting.Indexer.ExcludeVendored && analyze.IsVendor(update.Filename) { return nil @@ -229,7 +231,7 @@ func (b *BleveIndexer) addUpdate(batchWriter git.WriteCloserError, batchReader * }) } -func (b *BleveIndexer) addDelete(filename string, repo *models.Repository, batch rupture.FlushingBatch) error { +func (b *BleveIndexer) addDelete(filename string, repo *models.Repository, batch *gitea_bleve.FlushingBatch) error { id := filenameIndexerID(repo.ID, filename) return batch.Delete(id) } @@ -267,7 +269,7 @@ func (b *BleveIndexer) Close() { // Index indexes the data func (b *BleveIndexer) Index(repo *models.Repository, sha string, changes *repoChanges) error { - batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize) + batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize) if len(changes.Updates) > 0 { batchWriter, batchReader, cancel := git.CatFileBatch(repo.RepoPath()) @@ -296,7 +298,7 @@ func (b *BleveIndexer) Delete(repoID int64) error { if err != nil { return err } - batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize) + batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize) for _, hit := range result.Hits { if err = batch.Delete(hit.ID); err != nil { return err diff --git a/modules/indexer/issues/bleve.go b/modules/indexer/issues/bleve.go index b1385eb676256..db12874e84e58 100644 --- a/modules/indexer/issues/bleve.go +++ b/modules/indexer/issues/bleve.go @@ -9,8 +9,10 @@ import ( "os" "strconv" + gitea_bleve "code.gitea.io/gitea/modules/indexer/bleve" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/util" + "github.com/blevesearch/bleve/v2" "github.com/blevesearch/bleve/v2/analysis/analyzer/custom" "github.com/blevesearch/bleve/v2/analysis/token/lowercase" @@ -197,7 +199,7 @@ func (b *BleveIndexer) Close() { // Index will save the index data func (b *BleveIndexer) Index(issues []*IndexerData) error { - batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize) + batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize) for _, issue := range issues { if err := batch.Index(indexerID(issue.ID), struct { RepoID int64 @@ -218,7 +220,7 @@ func (b *BleveIndexer) Index(issues []*IndexerData) error { // Delete deletes indexes by ids func (b *BleveIndexer) Delete(ids ...int64) error { - batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize) + batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize) for _, id := range ids { if err := batch.Delete(indexerID(id)); err != nil { return err From ca2378994a16ba531441c54048306b1d912b4b08 Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Sun, 18 Jul 2021 15:44:54 +0800 Subject: [PATCH 2/3] Fix copyright head --- modules/indexer/bleve/batch.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/indexer/bleve/batch.go b/modules/indexer/bleve/batch.go index 32f69010ffd15..14d24c592898d 100644 --- a/modules/indexer/bleve/batch.go +++ b/modules/indexer/bleve/batch.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Gitea Authors. All rights reserved. +// Copyright 2021 The Gitea Authors. All rights reserved. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. From 16404c336854cb7e431e963fa43b62d5a50b2ff7 Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Sun, 18 Jul 2021 15:46:20 +0800 Subject: [PATCH 3/3] Fix lint --- modules/indexer/bleve/batch.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/modules/indexer/bleve/batch.go b/modules/indexer/bleve/batch.go index 14d24c592898d..79994e6e5be35 100644 --- a/modules/indexer/bleve/batch.go +++ b/modules/indexer/bleve/batch.go @@ -27,6 +27,7 @@ func NewFlushingBatch(index bleve.Index, maxBatchSize int) *FlushingBatch { } } +// Index add a new index to batch func (b *FlushingBatch) Index(id string, data interface{}) error { if err := b.batch.Index(id, data); err != nil { return err @@ -34,6 +35,7 @@ func (b *FlushingBatch) Index(id string, data interface{}) error { return b.flushIfFull() } +// Delete add a delete index to batch func (b *FlushingBatch) Delete(id string) error { b.batch.Delete(id) return b.flushIfFull() @@ -46,6 +48,7 @@ func (b *FlushingBatch) flushIfFull() error { return b.Flush() } +// Flush submit the batch and create a new one func (b *FlushingBatch) Flush() error { err := b.index.Batch(b.batch) if err != nil {