@@ -13,12 +13,14 @@ import (
1313 "net"
1414 "strconv"
1515 "strings"
16+ "sync"
1617 "time"
1718
1819 repo_model "code.gitea.io/gitea/models/repo"
1920 "code.gitea.io/gitea/modules/analyze"
2021 "code.gitea.io/gitea/modules/charset"
2122 "code.gitea.io/gitea/modules/git"
23+ "code.gitea.io/gitea/modules/graceful"
2224 "code.gitea.io/gitea/modules/json"
2325 "code.gitea.io/gitea/modules/log"
2426 "code.gitea.io/gitea/modules/setting"
@@ -46,6 +48,7 @@ type ElasticSearchIndexer struct {
4648 available bool
4749 availabilityCallback func (bool )
4850 stopTimer chan struct {}
51+ lock sync.RWMutex
4952}
5053
5154type elasticLogger struct {
@@ -144,7 +147,7 @@ func (b *ElasticSearchIndexer) realIndexerName() string {
144147
145148// Init will initialize the indexer
146149func (b * ElasticSearchIndexer ) init () (bool , error ) {
147- ctx := context . Background ()
150+ ctx := graceful . GetManager (). HammerContext ()
148151 exists , err := b .client .IndexExists (b .realIndexerName ()).Do (ctx )
149152 if err != nil {
150153 return false , b .checkError (err )
@@ -198,11 +201,15 @@ func (b *ElasticSearchIndexer) init() (bool, error) {
198201
199202// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes
200203func (b * ElasticSearchIndexer ) SetAvailabilityChangeCallback (callback func (bool )) {
204+ b .lock .Lock ()
205+ defer b .lock .Unlock ()
201206 b .availabilityCallback = callback
202207}
203208
204209// Ping checks if elastic is available
205210func (b * ElasticSearchIndexer ) Ping () bool {
211+ b .lock .RLock ()
212+ defer b .lock .RUnlock ()
206213 return b .available
207214}
208215
@@ -305,7 +312,7 @@ func (b *ElasticSearchIndexer) Index(ctx context.Context, repo *repo_model.Repos
305312 _ , err := b .client .Bulk ().
306313 Index (b .indexerAliasName ).
307314 Add (reqs ... ).
308- Do (context . Background () )
315+ Do (ctx )
309316 return b .checkError (err )
310317 }
311318 return nil
@@ -315,7 +322,7 @@ func (b *ElasticSearchIndexer) Index(ctx context.Context, repo *repo_model.Repos
315322func (b * ElasticSearchIndexer ) Delete (repoID int64 ) error {
316323 _ , err := b .client .DeleteByQuery (b .indexerAliasName ).
317324 Query (elastic .NewTermsQuery ("repo_id" , repoID )).
318- Do (context . Background ())
325+ Do (graceful . GetManager (). HammerContext ())
319326 return b .checkError (err )
320327}
321328
@@ -397,7 +404,7 @@ func extractAggs(searchResult *elastic.SearchResult) []*SearchResultLanguages {
397404}
398405
399406// Search searches for codes and language stats by given conditions.
400- func (b * ElasticSearchIndexer ) Search (repoIDs []int64 , language , keyword string , page , pageSize int , isMatch bool ) (int64 , []* SearchResult , []* SearchResultLanguages , error ) {
407+ func (b * ElasticSearchIndexer ) Search (ctx context. Context , repoIDs []int64 , language , keyword string , page , pageSize int , isMatch bool ) (int64 , []* SearchResult , []* SearchResultLanguages , error ) {
401408 searchType := esMultiMatchTypeBestFields
402409 if isMatch {
403410 searchType = esMultiMatchTypePhrasePrefix
@@ -438,7 +445,7 @@ func (b *ElasticSearchIndexer) Search(repoIDs []int64, language, keyword string,
438445 ).
439446 Sort ("repo_id" , true ).
440447 From (start ).Size (pageSize ).
441- Do (context . Background () )
448+ Do (ctx )
442449 if err != nil {
443450 return 0 , nil , nil , b .checkError (err )
444451 }
@@ -452,7 +459,7 @@ func (b *ElasticSearchIndexer) Search(repoIDs []int64, language, keyword string,
452459 Aggregation ("language" , aggregation ).
453460 Query (query ).
454461 Size (0 ). // We only needs stats information
455- Do (context . Background () )
462+ Do (ctx )
456463 if err != nil {
457464 return 0 , nil , nil , b .checkError (err )
458465 }
@@ -469,7 +476,7 @@ func (b *ElasticSearchIndexer) Search(repoIDs []int64, language, keyword string,
469476 ).
470477 Sort ("repo_id" , true ).
471478 From (start ).Size (pageSize ).
472- Do (context . Background () )
479+ Do (ctx )
473480 if err != nil {
474481 return 0 , nil , nil , b .checkError (err )
475482 }
@@ -486,27 +493,41 @@ func (b *ElasticSearchIndexer) Close() {
486493
487494func (b * ElasticSearchIndexer ) checkError (err error ) error {
488495 var opErr * net.OpError
489- if b .available && (elastic .IsConnErr (err ) || (errors .As (err , & opErr ) && (opErr .Op == "dial" || opErr .Op == "read" ))) {
490- b .available = false
491- if b .availabilityCallback != nil {
492- b .availabilityCallback (b .available )
493- }
496+ if ! (elastic .IsConnErr (err ) || (errors .As (err , & opErr ) && (opErr .Op == "dial" || opErr .Op == "read" ))) {
497+ return err
494498 }
499+
500+ b .setAvailability (false )
501+
495502 return err
496503}
497504
498505func (b * ElasticSearchIndexer ) checkAvailability () {
499- if b .available {
506+ if b .Ping () {
500507 return
501508 }
502509
503510 // Request cluster state to check if elastic is available again
504- _ , err := b .client .ClusterState ().Do (context . Background ())
511+ _ , err := b .client .ClusterState ().Do (graceful . GetManager (). ShutdownContext ())
505512 if err != nil {
513+ b .setAvailability (false )
514+ return
515+ }
516+
517+ b .setAvailability (true )
518+ }
519+
520+ func (b * ElasticSearchIndexer ) setAvailability (available bool ) {
521+ b .lock .Lock ()
522+ defer b .lock .Unlock ()
523+
524+ if b .available == available {
506525 return
507526 }
508- b .available = true
527+
528+ b .available = available
509529 if b .availabilityCallback != nil {
530+ // Call the callback from within the lock to ensure that the ordering remains correct
510531 b .availabilityCallback (b .available )
511532 }
512533}
0 commit comments