Skip to content

Commit 256bbf8

Browse files
committed
use one unique function to prepare leveldb
1 parent 825f9cb commit 256bbf8

6 files changed

+39
-38
lines changed

modules/queue/base_levelqueue.go

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,6 @@ package queue
55

66
import (
77
"context"
8-
"errors"
9-
"path/filepath"
10-
"strings"
118

129
"code.gitea.io/gitea/modules/nosql"
1310

@@ -22,19 +19,6 @@ type baseLevelQueue struct {
2219

2320
var _ baseQueue = (*baseLevelQueue)(nil)
2421

25-
func prepareLevelQueueConfig(cfg *BaseConfig) error {
26-
if cfg.ConnStr == "" { // use data dir as conn str
27-
dir := cfg.DataDir
28-
if !filepath.IsAbs(dir) {
29-
return errors.New("invalid leveldb data dir")
30-
}
31-
cfg.ConnStr = dir
32-
} else if !strings.HasPrefix(cfg.ConnStr, "leveldb://") {
33-
return errors.New("invalid leveldb connection string")
34-
}
35-
return nil
36-
}
37-
3822
func newBaseLevelQueueGeneric(cfg *BaseConfig, unique bool) (baseQueue, error) {
3923
if unique {
4024
return newBaseLevelQueueUnique(cfg)
@@ -43,16 +27,11 @@ func newBaseLevelQueueGeneric(cfg *BaseConfig, unique bool) (baseQueue, error) {
4327
}
4428

4529
func newBaseLevelQueueSimple(cfg *BaseConfig) (baseQueue, error) {
46-
if err := prepareLevelQueueConfig(cfg); err != nil {
47-
return nil, err
48-
}
49-
50-
q := &baseLevelQueue{conn: cfg.ConnStr, cfg: cfg}
51-
db, err := nosql.GetManager().GetLevelDB(q.conn)
30+
conn, db, err := prepareLevelDB(cfg)
5231
if err != nil {
5332
return nil, err
5433
}
55-
34+
q := &baseLevelQueue{conn: conn, cfg: cfg}
5635
q.internal, err = levelqueue.NewQueue(db, []byte(cfg.QueueFullName), false)
5736
if err != nil {
5837
return nil, err

modules/queue/base_levelqueue_common.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,16 @@ package queue
55

66
import (
77
"context"
8+
"errors"
9+
"path/filepath"
10+
"strings"
811
"sync"
912
"time"
1013

14+
"code.gitea.io/gitea/modules/nosql"
15+
1116
"gitea.com/lunny/levelqueue"
17+
"github.com/syndtr/goleveldb/leveldb"
1218
)
1319

1420
type baseLevelQueuePushPoper interface {
@@ -63,3 +69,24 @@ func (q *baseLevelQueueCommonImpl) PopItem(ctx context.Context) ([]byte, error)
6369
func baseLevelQueueCommon(cfg *BaseConfig, internal baseLevelQueuePushPoper, mu *sync.Mutex) *baseLevelQueueCommonImpl {
6470
return &baseLevelQueueCommonImpl{length: cfg.Length, internal: internal}
6571
}
72+
73+
func prepareLevelDB(cfg *BaseConfig) (conn string, db *leveldb.DB, err error) {
74+
if cfg.ConnStr == "" { // use data dir as conn str
75+
if !filepath.IsAbs(cfg.DataFullDir) {
76+
return "", nil, errors.New("invalid leveldb data dir")
77+
}
78+
conn = cfg.DataFullDir
79+
} else {
80+
if !strings.HasPrefix(cfg.ConnStr, "leveldb://") {
81+
return "", nil, errors.New("invalid leveldb connection string")
82+
}
83+
conn = cfg.ConnStr
84+
}
85+
for i := 0; i < 10; i++ {
86+
if db, err = nosql.GetManager().GetLevelDB(conn); err == nil {
87+
break
88+
}
89+
time.Sleep(1 * time.Second)
90+
}
91+
return conn, db, err
92+
}

modules/queue/base_levelqueue_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ func TestBaseLevelDB(t *testing.T) {
1515
_, err := newBaseLevelQueueGeneric(&BaseConfig{ConnStr: "redis://"}, false)
1616
assert.ErrorContains(t, err, "invalid leveldb connection string")
1717

18-
_, err = newBaseLevelQueueGeneric(&BaseConfig{DataDir: "relative"}, false)
18+
_, err = newBaseLevelQueueGeneric(&BaseConfig{DataFullDir: "relative"}, false)
1919
assert.ErrorContains(t, err, "invalid leveldb data dir")
2020

2121
testQueueBasic(t, newBaseLevelQueueSimple, toBaseConfig("baseLevelQueue", setting.QueueSettings{Datadir: t.TempDir() + "/queue-test", Length: 10}), false)

modules/queue/base_levelqueue_unique.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,11 @@ type baseLevelQueueUnique struct {
2525
var _ baseQueue = (*baseLevelQueueUnique)(nil)
2626

2727
func newBaseLevelQueueUnique(cfg *BaseConfig) (baseQueue, error) {
28-
if err := prepareLevelQueueConfig(cfg); err != nil {
29-
return nil, err
30-
}
31-
32-
q := &baseLevelQueueUnique{conn: cfg.ConnStr, cfg: cfg}
33-
db, err := nosql.GetManager().GetLevelDB(q.conn)
28+
conn, db, err := prepareLevelDB(cfg)
3429
if err != nil {
3530
return nil, err
3631
}
37-
32+
q := &baseLevelQueueUnique{conn: conn, cfg: cfg}
3833
q.internal, err = levelqueue.NewUniqueQueue(db, []byte(cfg.QueueFullName), []byte(cfg.SetFullName), false)
3934
if err != nil {
4035
return nil, err

modules/queue/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ import (
99

1010
type BaseConfig struct {
1111
ManagedName string
12+
DataFullDir string // the caller must prepare an absolute path
1213

13-
DataDir string
1414
ConnStr string
1515
Length int
1616

@@ -20,8 +20,8 @@ type BaseConfig struct {
2020
func toBaseConfig(managedName string, queueSetting setting.QueueSettings) *BaseConfig {
2121
baseConfig := &BaseConfig{
2222
ManagedName: managedName,
23+
DataFullDir: queueSetting.Datadir,
2324

24-
DataDir: queueSetting.Datadir,
2525
ConnStr: queueSetting.ConnStr,
2626
Length: queueSetting.Length,
2727
}

modules/queue/manager_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,10 @@ CONN_STR = redis://
4545
assert.NoError(t, err)
4646
assert.Equal(t, "default", q.GetName())
4747
assert.Equal(t, "level", q.GetType())
48-
assert.Equal(t, filepath.Join(setting.AppDataPath, "queues/common"), q.baseConfig.DataDir)
48+
assert.Equal(t, filepath.Join(setting.AppDataPath, "queues/common"), q.baseConfig.DataFullDir)
4949
assert.Equal(t, 100, q.baseConfig.Length)
5050
assert.Equal(t, 20, q.batchLength)
51-
assert.Equal(t, q.baseConfig.DataDir, q.baseConfig.ConnStr)
51+
assert.Equal(t, "", q.baseConfig.ConnStr)
5252
assert.Equal(t, "default_queue", q.baseConfig.QueueFullName)
5353
assert.Equal(t, "default_queue_unique", q.baseConfig.SetFullName)
5454
assert.Equal(t, 10, q.GetWorkerMaxNumber())
@@ -83,7 +83,7 @@ MAX_WORKERS = 2
8383
q1 := createWorkerPoolQueue[string]("no-such", cfgProvider, nil, false)
8484
assert.Equal(t, "no-such", q1.GetName())
8585
assert.Equal(t, "dummy", q1.GetType()) // no handler, so it becomes dummy
86-
assert.Equal(t, filepath.Join(setting.AppDataPath, "queues/dir1"), q1.baseConfig.DataDir)
86+
assert.Equal(t, filepath.Join(setting.AppDataPath, "queues/dir1"), q1.baseConfig.DataFullDir)
8787
assert.Equal(t, 100, q1.baseConfig.Length)
8888
assert.Equal(t, 20, q1.batchLength)
8989
assert.Equal(t, "addrs=127.0.0.1:6379 db=0", q1.baseConfig.ConnStr)
@@ -99,10 +99,10 @@ MAX_WORKERS = 2
9999
q2 := createWorkerPoolQueue("sub", cfgProvider, func(s ...int) (unhandled []int) { return nil }, false)
100100
assert.Equal(t, "sub", q2.GetName())
101101
assert.Equal(t, "level", q2.GetType())
102-
assert.Equal(t, filepath.Join(setting.AppDataPath, "queues/dir2"), q2.baseConfig.DataDir)
102+
assert.Equal(t, filepath.Join(setting.AppDataPath, "queues/dir2"), q2.baseConfig.DataFullDir)
103103
assert.Equal(t, 102, q2.baseConfig.Length)
104104
assert.Equal(t, 22, q2.batchLength)
105-
assert.Equal(t, q2.baseConfig.DataDir, q2.baseConfig.ConnStr)
105+
assert.Equal(t, "", q2.baseConfig.ConnStr)
106106
assert.Equal(t, "sub_q2", q2.baseConfig.QueueFullName)
107107
assert.Equal(t, "sub_q2_u2", q2.baseConfig.SetFullName)
108108
assert.Equal(t, 2, q2.GetWorkerMaxNumber())

0 commit comments

Comments
 (0)