Skip to content

Help to recover from corrupted levelqueue #24912

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 25 additions & 14 deletions modules/queue/base_levelqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,21 @@ package queue

import (
"context"
"sync/atomic"

"code.gitea.io/gitea/modules/nosql"
"code.gitea.io/gitea/modules/queue/lqinternal"

"gitea.com/lunny/levelqueue"
"github.com/syndtr/goleveldb/leveldb"
)

type baseLevelQueue struct {
internal *levelqueue.Queue
conn string
cfg *BaseConfig
internal atomic.Pointer[levelqueue.Queue]

conn string
cfg *BaseConfig
db *leveldb.DB
}

var _ baseQueue = (*baseLevelQueue)(nil)
Expand All @@ -31,42 +36,48 @@ func newBaseLevelQueueSimple(cfg *BaseConfig) (baseQueue, error) {
if err != nil {
return nil, err
}
q := &baseLevelQueue{conn: conn, cfg: cfg}
q.internal, err = levelqueue.NewQueue(db, []byte(cfg.QueueFullName), false)
q := &baseLevelQueue{conn: conn, cfg: cfg, db: db}
lq, err := levelqueue.NewQueue(db, []byte(cfg.QueueFullName), false)
if err != nil {
return nil, err
}

q.internal.Store(lq)
return q, nil
}

func (q *baseLevelQueue) PushItem(ctx context.Context, data []byte) error {
return baseLevelQueueCommon(q.cfg, q.internal, nil).PushItem(ctx, data)
c := baseLevelQueueCommon(q.cfg, nil, func() baseLevelQueuePushPoper { return q.internal.Load() })
return c.PushItem(ctx, data)
}

func (q *baseLevelQueue) PopItem(ctx context.Context) ([]byte, error) {
return baseLevelQueueCommon(q.cfg, q.internal, nil).PopItem(ctx)
c := baseLevelQueueCommon(q.cfg, nil, func() baseLevelQueuePushPoper { return q.internal.Load() })
return c.PopItem(ctx)
}

func (q *baseLevelQueue) HasItem(ctx context.Context, data []byte) (bool, error) {
return false, nil
}

func (q *baseLevelQueue) Len(ctx context.Context) (int, error) {
return int(q.internal.Len()), nil
return int(q.internal.Load().Len()), nil
}

func (q *baseLevelQueue) Close() error {
err := q.internal.Close()
err := q.internal.Load().Close()
_ = nosql.GetManager().CloseLevelDB(q.conn)
q.db = nil // the db is not managed by us, it's managed by the nosql manager
return err
}

func (q *baseLevelQueue) RemoveAll(ctx context.Context) error {
for q.internal.Len() > 0 {
if _, err := q.internal.LPop(); err != nil {
return err
}
lqinternal.RemoveLevelQueueKeys(q.db, []byte(q.cfg.QueueFullName))
lq, err := levelqueue.NewQueue(q.db, []byte(q.cfg.QueueFullName), false)
if err != nil {
return err
}
old := q.internal.Load()
q.internal.Store(lq)
_ = old.Close() // Not ideal for concurrency. Luckily, the levelqueue only sets its db=nil because it doesn't manage the db, so far so good
return nil
}
17 changes: 9 additions & 8 deletions modules/queue/base_levelqueue_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@ import (
"github.com/syndtr/goleveldb/leveldb"
)

// baseLevelQueuePushPoper is the common interface for levelqueue.Queue and levelqueue.UniqueQueue
type baseLevelQueuePushPoper interface {
RPush(data []byte) error
LPop() ([]byte, error)
Len() int64
}

type baseLevelQueueCommonImpl struct {
length int
internal baseLevelQueuePushPoper
mu *sync.Mutex
length int
internalFunc func() baseLevelQueuePushPoper
mu *sync.Mutex
}

func (q *baseLevelQueueCommonImpl) PushItem(ctx context.Context, data []byte) error {
Expand All @@ -36,11 +37,11 @@ func (q *baseLevelQueueCommonImpl) PushItem(ctx context.Context, data []byte) er
defer q.mu.Unlock()
}

cnt := int(q.internal.Len())
cnt := int(q.internalFunc().Len())
if cnt >= q.length {
return true, nil
}
retry, err = false, q.internal.RPush(data)
retry, err = false, q.internalFunc().RPush(data)
if err == levelqueue.ErrAlreadyInQueue {
err = ErrAlreadyInQueue
}
Expand All @@ -55,7 +56,7 @@ func (q *baseLevelQueueCommonImpl) PopItem(ctx context.Context) ([]byte, error)
defer q.mu.Unlock()
}

data, err = q.internal.LPop()
data, err = q.internalFunc().LPop()
if err == levelqueue.ErrNotFound {
return true, nil, nil
}
Expand All @@ -66,8 +67,8 @@ func (q *baseLevelQueueCommonImpl) PopItem(ctx context.Context) ([]byte, error)
})
}

func baseLevelQueueCommon(cfg *BaseConfig, internal baseLevelQueuePushPoper, mu *sync.Mutex) *baseLevelQueueCommonImpl {
return &baseLevelQueueCommonImpl{length: cfg.Length, internal: internal}
func baseLevelQueueCommon(cfg *BaseConfig, mu *sync.Mutex, internalFunc func() baseLevelQueuePushPoper) *baseLevelQueueCommonImpl {
return &baseLevelQueueCommonImpl{length: cfg.Length, mu: mu, internalFunc: internalFunc}
}

func prepareLevelDB(cfg *BaseConfig) (conn string, db *leveldb.DB, err error) {
Expand Down
55 changes: 55 additions & 0 deletions modules/queue/base_levelqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ package queue
import (
"testing"

"code.gitea.io/gitea/modules/queue/lqinternal"
"code.gitea.io/gitea/modules/setting"

"gitea.com/lunny/levelqueue"
"github.com/stretchr/testify/assert"
"github.com/syndtr/goleveldb/leveldb"
)

func TestBaseLevelDB(t *testing.T) {
Expand All @@ -21,3 +24,55 @@ func TestBaseLevelDB(t *testing.T) {
testQueueBasic(t, newBaseLevelQueueSimple, toBaseConfig("baseLevelQueue", setting.QueueSettings{Datadir: t.TempDir() + "/queue-test", Length: 10}), false)
testQueueBasic(t, newBaseLevelQueueUnique, toBaseConfig("baseLevelQueueUnique", setting.QueueSettings{ConnStr: "leveldb://" + t.TempDir() + "/queue-test", Length: 10}), true)
}

func TestCorruptedLevelQueue(t *testing.T) {
// sometimes the levelqueue could be in a corrupted state, this test is to make sure it can recover from it
dbDir := t.TempDir() + "/levelqueue-test"
db, err := leveldb.OpenFile(dbDir, nil)
if !assert.NoError(t, err) {
return
}
defer db.Close()

assert.NoError(t, db.Put([]byte("other-key"), []byte("other-value"), nil))

nameQueuePrefix := []byte("queue_name")
nameSetPrefix := []byte("set_name")
lq, err := levelqueue.NewUniqueQueue(db, nameQueuePrefix, nameSetPrefix, false)
assert.NoError(t, err)
assert.NoError(t, lq.RPush([]byte("item-1")))

itemKey := lqinternal.QueueItemKeyBytes(nameQueuePrefix, 1)
itemValue, err := db.Get(itemKey, nil)
assert.NoError(t, err)
assert.Equal(t, []byte("item-1"), itemValue)

// there should be 5 keys in db: queue low, queue high, 1 queue item, 1 set item, and "other-key"
keys := lqinternal.ListLevelQueueKeys(db)
assert.Len(t, keys, 5)

// delete the queue item key, to corrupt the queue
assert.NoError(t, db.Delete(itemKey, nil))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does deleting an item corrupt the queue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See this PR's description:

if the keys in LevelDB went out-of-sync, the LevelQueue itself doesn't have the ability to recover, eg:

  • LevelQueue.Len() reports 100
  • LevelQueue.LPop() reports ErrNotFound = errors.New("no key found")

It's levelqueue's bug, not Gitea's. The levelqueue.LPop was written as this.

// now the queue is corrupted, it never works again
_, err = lq.LPop()
assert.ErrorIs(t, err, levelqueue.ErrNotFound)
assert.NoError(t, lq.Close())

// remove all the queue related keys to reset the queue
lqinternal.RemoveLevelQueueKeys(db, nameQueuePrefix)
lqinternal.RemoveLevelQueueKeys(db, nameSetPrefix)
// now there should be only 1 key in db: "other-key"
keys = lqinternal.ListLevelQueueKeys(db)
assert.Len(t, keys, 1)
assert.Equal(t, []byte("other-key"), keys[0])

// re-create a queue from db
lq, err = levelqueue.NewUniqueQueue(db, nameQueuePrefix, nameSetPrefix, false)
assert.NoError(t, err)
assert.NoError(t, lq.RPush([]byte("item-new-1")))
// now the queue works again
itemValue, err = lq.LPop()
assert.NoError(t, err)
assert.Equal(t, []byte("item-new-1"), itemValue)
assert.NoError(t, lq.Close())
}
58 changes: 25 additions & 33 deletions modules/queue/base_levelqueue_unique.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,21 @@ package queue
import (
"context"
"sync"
"unsafe"
"sync/atomic"

"code.gitea.io/gitea/modules/nosql"
"code.gitea.io/gitea/modules/queue/lqinternal"

"gitea.com/lunny/levelqueue"
"github.com/syndtr/goleveldb/leveldb"
)

type baseLevelQueueUnique struct {
internal *levelqueue.UniqueQueue
conn string
cfg *BaseConfig
internal atomic.Pointer[levelqueue.UniqueQueue]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why must this be atomic now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because RemoveAll needs to re-create the levelqueue. The levelqueue is used by different goroutines.


conn string
cfg *BaseConfig
db *leveldb.DB

mu sync.Mutex // the levelqueue.UniqueQueue is not thread-safe, there is no mutex protecting the underlying queue&set together
}
Expand All @@ -29,68 +32,57 @@ func newBaseLevelQueueUnique(cfg *BaseConfig) (baseQueue, error) {
if err != nil {
return nil, err
}
q := &baseLevelQueueUnique{conn: conn, cfg: cfg}
q.internal, err = levelqueue.NewUniqueQueue(db, []byte(cfg.QueueFullName), []byte(cfg.SetFullName), false)
q := &baseLevelQueueUnique{conn: conn, cfg: cfg, db: db}
lq, err := levelqueue.NewUniqueQueue(db, []byte(cfg.QueueFullName), []byte(cfg.SetFullName), false)
if err != nil {
return nil, err
}

q.internal.Store(lq)
return q, nil
}

func (q *baseLevelQueueUnique) PushItem(ctx context.Context, data []byte) error {
return baseLevelQueueCommon(q.cfg, q.internal, &q.mu).PushItem(ctx, data)
c := baseLevelQueueCommon(q.cfg, &q.mu, func() baseLevelQueuePushPoper { return q.internal.Load() })
return c.PushItem(ctx, data)
}

func (q *baseLevelQueueUnique) PopItem(ctx context.Context) ([]byte, error) {
return baseLevelQueueCommon(q.cfg, q.internal, &q.mu).PopItem(ctx)
c := baseLevelQueueCommon(q.cfg, &q.mu, func() baseLevelQueuePushPoper { return q.internal.Load() })
return c.PopItem(ctx)
}

func (q *baseLevelQueueUnique) HasItem(ctx context.Context, data []byte) (bool, error) {
q.mu.Lock()
defer q.mu.Unlock()
return q.internal.Has(data)
return q.internal.Load().Has(data)
}

func (q *baseLevelQueueUnique) Len(ctx context.Context) (int, error) {
q.mu.Lock()
defer q.mu.Unlock()
return int(q.internal.Len()), nil
return int(q.internal.Load().Len()), nil
}

func (q *baseLevelQueueUnique) Close() error {
q.mu.Lock()
defer q.mu.Unlock()
err := q.internal.Close()
err := q.internal.Load().Close()
q.db = nil // the db is not managed by us, it's managed by the nosql manager
_ = nosql.GetManager().CloseLevelDB(q.conn)
return err
}

func (q *baseLevelQueueUnique) RemoveAll(ctx context.Context) error {
q.mu.Lock()
defer q.mu.Unlock()

type levelUniqueQueue struct {
q *levelqueue.Queue
set *levelqueue.Set
db *leveldb.DB
}
lq := (*levelUniqueQueue)(unsafe.Pointer(q.internal))

for lq.q.Len() > 0 {
if _, err := lq.q.LPop(); err != nil {
return err
}
}

// the "set" must be cleared after the "list" because there is no transaction.
// it's better to have duplicate items than losing items.
members, err := lq.set.Members()
lqinternal.RemoveLevelQueueKeys(q.db, []byte(q.cfg.QueueFullName))
lqinternal.RemoveLevelQueueKeys(q.db, []byte(q.cfg.SetFullName))
lq, err := levelqueue.NewUniqueQueue(q.db, []byte(q.cfg.QueueFullName), []byte(q.cfg.SetFullName), false)
if err != nil {
return err // seriously corrupted
}
for _, v := range members {
_, _ = lq.set.Remove(v)
return err
}
old := q.internal.Load()
q.internal.Store(lq)
_ = old.Close() // Not ideal for concurrency. Luckily, the levelqueue only sets its db=nil because it doesn't manage the db, so far so good
return nil
}
48 changes: 48 additions & 0 deletions modules/queue/lqinternal/lqinternal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT

package lqinternal

import (
"bytes"
"encoding/binary"

"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
)

func QueueItemIDBytes(id int64) []byte {
buf := make([]byte, 8)
binary.PutVarint(buf, id)
return buf
}

func QueueItemKeyBytes(prefix []byte, id int64) []byte {
key := make([]byte, len(prefix), len(prefix)+1+8)
copy(key, prefix)
key = append(key, '-')
return append(key, QueueItemIDBytes(id)...)
}

func RemoveLevelQueueKeys(db *leveldb.DB, namePrefix []byte) {
keyPrefix := make([]byte, len(namePrefix)+1)
copy(keyPrefix, namePrefix)
keyPrefix[len(namePrefix)] = '-'

it := db.NewIterator(nil, &opt.ReadOptions{Strict: opt.NoStrict})
defer it.Release()
for it.Next() {
if bytes.HasPrefix(it.Key(), keyPrefix) {
_ = db.Delete(it.Key(), nil)
}
}
}

func ListLevelQueueKeys(db *leveldb.DB) (res [][]byte) {
it := db.NewIterator(nil, &opt.ReadOptions{Strict: opt.NoStrict})
defer it.Release()
for it.Next() {
res = append(res, it.Key())
}
return res
}