Skip to content

Commit 1ff82ab

Browse files
committed
rewrite metadata caching by persist into db
1 parent 7b01c4a commit 1ff82ab

File tree

10 files changed

+54
-214
lines changed

10 files changed

+54
-214
lines changed

coordinator/internal/controller/api/controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,5 +41,5 @@ func InitController(cfg *config.Config, chainCfg *params.ChainConfig, db *gorm.D
4141

4242
Auth = NewAuthController(db, cfg, vf)
4343
GetTask = NewGetTaskController(cfg, chainCfg, db, reg)
44-
SubmitProof = NewSubmitProofController(cfg, chainCfg, db, vf, reg, GetTask)
44+
SubmitProof = NewSubmitProofController(cfg, chainCfg, db, vf, reg)
4545
}

coordinator/internal/controller/api/submit_proof.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"gorm.io/gorm"
1010

1111
"scroll-tech/common/types"
12-
"scroll-tech/common/types/message"
1312

1413
"scroll-tech/coordinator/internal/config"
1514
"scroll-tech/coordinator/internal/logic/submitproof"
@@ -23,15 +22,10 @@ type SubmitProofController struct {
2322
}
2423

2524
// NewSubmitProofController create the submit proof api controller instance
26-
func NewSubmitProofController(cfg *config.Config, chainCfg *params.ChainConfig, db *gorm.DB, vf *verifier.Verifier, reg prometheus.Registerer, getTaskController *GetTaskController) *SubmitProofController {
27-
controller := SubmitProofController{
25+
func NewSubmitProofController(cfg *config.Config, chainCfg *params.ChainConfig, db *gorm.DB, vf *verifier.Verifier, reg prometheus.Registerer) *SubmitProofController {
26+
return &SubmitProofController{
2827
submitProofReceiverLogic: submitproof.NewSubmitProofReceiverLogic(cfg.ProverManager, chainCfg, db, vf, reg),
2928
}
30-
proverTaskIf := getTaskController.ProverTasks()
31-
controller.submitProofReceiverLogic.ChunkTask = proverTaskIf[message.ProofTypeChunk]
32-
controller.submitProofReceiverLogic.BatchTask = proverTaskIf[message.ProofTypeBatch]
33-
controller.submitProofReceiverLogic.BundleTask = proverTaskIf[message.ProofTypeBundle]
34-
return &controller
3529
}
3630

3731
// SubmitProof prover submit the proof to coordinator

coordinator/internal/logic/provertask/batch_prover_task.go

Lines changed: 10 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ func NewBatchProverTask(cfg *config.Config, chainCfg *params.ChainConfig, db *go
4747
batchOrm: orm.NewBatch(db),
4848
proverTaskOrm: orm.NewProverTask(db),
4949
proverBlockListOrm: orm.NewProverBlockList(db),
50-
taskCache: newCache(128),
5150
},
5251
batchTaskGetTaskTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
5352
Name: "coordinator_batch_get_task_total",
@@ -162,26 +161,28 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
162161
AssignedAt: utils.NowUTC(),
163162
}
164163

165-
// Store session info.
166-
if err = bp.proverTaskOrm.InsertProverTask(ctx.Copy(), &proverTask); err != nil {
167-
bp.recoverActiveAttempts(ctx, batchTask)
168-
log.Error("insert batch prover task info fail", "task_id", batchTask.Hash, "publicKey", taskCtx.PublicKey, "err", err)
169-
return nil, ErrCoordinatorInternalFailure
170-
}
171-
172164
taskMsg, err := bp.formatProverTask(ctx.Copy(), &proverTask, batchTask, hardForkName)
173165
if err != nil {
174166
bp.recoverActiveAttempts(ctx, batchTask)
175167
log.Error("format prover task failure", "task_id", batchTask.Hash, "err", err)
176168
return nil, ErrCoordinatorInternalFailure
177169
}
178170
if getTaskParameter.Universal {
179-
taskMsg, err = bp.applyUniversal(taskMsg)
171+
var metadata []byte
172+
taskMsg, metadata, err = bp.applyUniversal(taskMsg)
180173
if err != nil {
181174
bp.recoverActiveAttempts(ctx, batchTask)
182175
log.Error("Generate universal prover task failure", "task_id", batchTask.Hash, "type", "batch")
183176
return nil, ErrCoordinatorInternalFailure
184177
}
178+
proverTask.Metadata = metadata
179+
}
180+
181+
// Store session info.
182+
if err = bp.proverTaskOrm.InsertProverTask(ctx.Copy(), &proverTask); err != nil {
183+
bp.recoverActiveAttempts(ctx, batchTask)
184+
log.Error("insert batch prover task info fail", "task_id", batchTask.Hash, "publicKey", taskCtx.PublicKey, "err", err)
185+
return nil, ErrCoordinatorInternalFailure
185186
}
186187

187188
bp.batchTaskGetTaskTotal.WithLabelValues(hardForkName).Inc()
@@ -194,39 +195,6 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
194195
return taskMsg, nil
195196
}
196197

197-
func (bp *BatchProverTask) GetTaskMetaData(ctx *gin.Context, proverTask *orm.ProverTask, hardForkName string) (string, error) {
198-
taskID := proverTask.TaskID
199-
if cached := bp.taskCache.Query(taskID); cached != nil {
200-
return cached.MetaData, nil
201-
}
202-
203-
// for most case we simply query metadata from cache, but if the task is not in cache, we need quite a few
204-
// effort to resume the cached universal task
205-
batchTask, err := bp.batchOrm.GetBatchByHash(ctx.Copy(), taskID)
206-
if err != nil {
207-
log.Error("failed to get chunk by hash", "task_id", taskID, "err", err)
208-
return "", ErrCoordinatorInternalFailure
209-
}
210-
taskMsg, err := bp.formatProverTask(ctx.Copy(), proverTask, batchTask, hardForkName)
211-
if err != nil {
212-
log.Error("re-format prover task failure", "task_id", taskID, "err", err)
213-
return "", ErrCoordinatorInternalFailure
214-
}
215-
216-
_, err = bp.applyUniversal(taskMsg)
217-
if err != nil {
218-
log.Error("Generate universal prover task failure", "task_id", taskID, "type", "chunk")
219-
return "", ErrCoordinatorInternalFailure
220-
}
221-
222-
if cached := bp.taskCache.Query(taskID); cached == nil {
223-
log.Error("Still can not obtain metadata, something wrong?", "task_id", taskID, "type", "chunk")
224-
return "", ErrCoordinatorInternalFailure
225-
} else {
226-
return cached.MetaData, nil
227-
}
228-
}
229-
230198
func (bp *BatchProverTask) formatProverTask(ctx context.Context, task *orm.ProverTask, batch *orm.Batch, hardForkName string) (*coordinatorType.GetTaskSchema, error) {
231199
// get chunk from db
232200
chunks, err := bp.chunkOrm.GetChunksByBatchHash(ctx, task.TaskID)

coordinator/internal/logic/provertask/bundle_prover_task.go

Lines changed: 10 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ func NewBundleProverTask(cfg *config.Config, chainCfg *params.ChainConfig, db *g
4545
bundleOrm: orm.NewBundle(db),
4646
proverTaskOrm: orm.NewProverTask(db),
4747
proverBlockListOrm: orm.NewProverBlockList(db),
48-
taskCache: newCache(128),
4948
},
5049
bundleTaskGetTaskTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
5150
Name: "coordinator_bundle_get_task_total",
@@ -160,26 +159,28 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat
160159
AssignedAt: utils.NowUTC(),
161160
}
162161

163-
// Store session info.
164-
if err = bp.proverTaskOrm.InsertProverTask(ctx.Copy(), &proverTask); err != nil {
165-
bp.recoverActiveAttempts(ctx, bundleTask)
166-
log.Error("insert bundle prover task info fail", "task_id", bundleTask.Hash, "publicKey", taskCtx.PublicKey, "err", err)
167-
return nil, ErrCoordinatorInternalFailure
168-
}
169-
170162
taskMsg, err := bp.formatProverTask(ctx.Copy(), &proverTask, hardForkName)
171163
if err != nil {
172164
bp.recoverActiveAttempts(ctx, bundleTask)
173165
log.Error("format bundle prover task failure", "task_id", bundleTask.Hash, "err", err)
174166
return nil, ErrCoordinatorInternalFailure
175167
}
176168
if getTaskParameter.Universal {
177-
taskMsg, err = bp.applyUniversal(taskMsg)
169+
var metadata []byte
170+
taskMsg, metadata, err = bp.applyUniversal(taskMsg)
178171
if err != nil {
179172
bp.recoverActiveAttempts(ctx, bundleTask)
180173
log.Error("Generate universal prover task failure", "task_id", bundleTask.Hash, "type", "bundle")
181174
return nil, ErrCoordinatorInternalFailure
182175
}
176+
proverTask.Metadata = metadata
177+
}
178+
179+
// Store session info.
180+
if err = bp.proverTaskOrm.InsertProverTask(ctx.Copy(), &proverTask); err != nil {
181+
bp.recoverActiveAttempts(ctx, bundleTask)
182+
log.Error("insert bundle prover task info fail", "task_id", bundleTask.Hash, "publicKey", taskCtx.PublicKey, "err", err)
183+
return nil, ErrCoordinatorInternalFailure
183184
}
184185

185186
bp.bundleTaskGetTaskTotal.WithLabelValues(hardForkName).Inc()
@@ -192,34 +193,6 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat
192193
return taskMsg, nil
193194
}
194195

195-
func (bp *BundleProverTask) GetTaskMetaData(ctx *gin.Context, proverTask *orm.ProverTask, hardForkName string) (string, error) {
196-
taskID := proverTask.TaskID
197-
if cached := bp.taskCache.Query(taskID); cached != nil {
198-
return cached.MetaData, nil
199-
}
200-
201-
// for most case we simply query metadata from cache, but if the task is not in cache, we need quite a few
202-
// effort to resume the cached universal task
203-
taskMsg, err := bp.formatProverTask(ctx.Copy(), proverTask, hardForkName)
204-
if err != nil {
205-
log.Error("re-format prover task failure", "task_id", taskID, "err", err)
206-
return "", ErrCoordinatorInternalFailure
207-
}
208-
209-
_, err = bp.applyUniversal(taskMsg)
210-
if err != nil {
211-
log.Error("Generate universal prover task failure", "task_id", taskID, "type", "chunk")
212-
return "", ErrCoordinatorInternalFailure
213-
}
214-
215-
if cached := bp.taskCache.Query(taskID); cached == nil {
216-
log.Error("Still can not obtain metadata, something wrong?", "task_id", taskID, "type", "chunk")
217-
return "", ErrCoordinatorInternalFailure
218-
} else {
219-
return cached.MetaData, nil
220-
}
221-
}
222-
223196
func (bp *BundleProverTask) formatProverTask(ctx context.Context, task *orm.ProverTask, hardForkName string) (*coordinatorType.GetTaskSchema, error) {
224197
// get bundle from db
225198
batches, err := bp.batchOrm.GetBatchesByBundleHash(ctx, task.TaskID)

coordinator/internal/logic/provertask/chunk_prover_task.go

Lines changed: 9 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ func NewChunkProverTask(cfg *config.Config, chainCfg *params.ChainConfig, db *go
4343
blockOrm: orm.NewL2Block(db),
4444
proverTaskOrm: orm.NewProverTask(db),
4545
proverBlockListOrm: orm.NewProverBlockList(db),
46-
taskCache: newCache(1024),
4746
},
4847
chunkTaskGetTaskTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
4948
Name: "coordinator_chunk_get_task_total",
@@ -158,25 +157,27 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
158157
AssignedAt: utils.NowUTC(),
159158
}
160159

161-
if err = cp.proverTaskOrm.InsertProverTask(ctx.Copy(), &proverTask); err != nil {
162-
cp.recoverActiveAttempts(ctx, chunkTask)
163-
log.Error("insert chunk prover task fail", "task_id", chunkTask.Hash, "publicKey", taskCtx.PublicKey, "err", err)
164-
return nil, ErrCoordinatorInternalFailure
165-
}
166-
167160
taskMsg, err := cp.formatProverTask(ctx.Copy(), &proverTask, chunkTask, hardForkName)
168161
if err != nil {
169162
cp.recoverActiveAttempts(ctx, chunkTask)
170163
log.Error("format prover task failure", "task_id", chunkTask.Hash, "err", err)
171164
return nil, ErrCoordinatorInternalFailure
172165
}
173166
if getTaskParameter.Universal {
174-
taskMsg, err = cp.applyUniversal(taskMsg)
167+
var metadata []byte
168+
taskMsg, metadata, err = cp.applyUniversal(taskMsg)
175169
if err != nil {
176170
cp.recoverActiveAttempts(ctx, chunkTask)
177171
log.Error("Generate universal prover task failure", "task_id", chunkTask.Hash, "type", "chunk")
178172
return nil, ErrCoordinatorInternalFailure
179173
}
174+
proverTask.Metadata = metadata
175+
}
176+
177+
if err = cp.proverTaskOrm.InsertProverTask(ctx.Copy(), &proverTask); err != nil {
178+
cp.recoverActiveAttempts(ctx, chunkTask)
179+
log.Error("insert chunk prover task fail", "task_id", chunkTask.Hash, "publicKey", taskCtx.PublicKey, "err", err)
180+
return nil, ErrCoordinatorInternalFailure
180181
}
181182

182183
cp.chunkTaskGetTaskTotal.WithLabelValues(hardForkName).Inc()
@@ -189,39 +190,6 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
189190
return taskMsg, nil
190191
}
191192

192-
func (cp *ChunkProverTask) GetTaskMetaData(ctx *gin.Context, proverTask *orm.ProverTask, hardForkName string) (string, error) {
193-
taskID := proverTask.TaskID
194-
if cached := cp.taskCache.Query(taskID); cached != nil {
195-
return cached.MetaData, nil
196-
}
197-
198-
// for most case we simply query metadata from cache, but if the task is not in cache, we need quite a few
199-
// effort to resume the cached universal task
200-
chunkTask, err := cp.chunkOrm.GetChunkByHash(ctx.Copy(), taskID)
201-
if err != nil {
202-
log.Error("failed to get chunk by hash", "task_id", taskID, "err", err)
203-
return "", ErrCoordinatorInternalFailure
204-
}
205-
taskMsg, err := cp.formatProverTask(ctx.Copy(), proverTask, chunkTask, hardForkName)
206-
if err != nil {
207-
log.Error("re-format prover task failure", "task_id", taskID, "err", err)
208-
return "", ErrCoordinatorInternalFailure
209-
}
210-
211-
_, err = cp.applyUniversal(taskMsg)
212-
if err != nil {
213-
log.Error("Generate universal prover task failure", "task_id", taskID, "type", "chunk")
214-
return "", ErrCoordinatorInternalFailure
215-
}
216-
217-
if cached := cp.taskCache.Query(taskID); cached == nil {
218-
log.Error("Still can not obtain metadata, something wrong?", "task_id", taskID, "type", "chunk")
219-
return "", ErrCoordinatorInternalFailure
220-
} else {
221-
return cached.MetaData, nil
222-
}
223-
}
224-
225193
func (cp *ChunkProverTask) formatProverTask(ctx context.Context, task *orm.ProverTask, chunk *orm.Chunk, hardForkName string) (*coordinatorType.GetTaskSchema, error) {
226194
// Get block hashes.
227195
blockHashes, dbErr := cp.blockOrm.GetL2BlockHashesByChunkHash(ctx, task.TaskID)

coordinator/internal/logic/provertask/prover_task.go

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ var (
3434
// ProverTask the interface of a collector who send data to prover
3535
type ProverTask interface {
3636
Assign(ctx *gin.Context, getTaskParameter *coordinatorType.GetTaskParameter) (*coordinatorType.GetTaskSchema, error)
37-
GetTaskMetaData(ctx *gin.Context, task *orm.ProverTask, HardForkName string) (string, error)
3837
}
3938

4039
// BaseProverTask a base prover task which contain series functions
@@ -49,8 +48,6 @@ type BaseProverTask struct {
4948
blockOrm *orm.L2Block
5049
proverTaskOrm *orm.ProverTask
5150
proverBlockListOrm *orm.ProverBlockList
52-
53-
taskCache *TaskCache
5451
}
5552

5653
type proverTaskContext struct {
@@ -189,24 +186,15 @@ func (b *BaseProverTask) checkParameter(ctx *gin.Context) (*proverTaskContext, e
189186
return &ptc, nil
190187
}
191188

192-
func (b *BaseProverTask) applyUniversal(schema *coordinatorType.GetTaskSchema) (*coordinatorType.GetTaskSchema, error) {
193-
if cached := b.taskCache.Query(schema.TaskID); cached != nil {
194-
schema.TaskData = cached.UTaskData
195-
return schema, nil
196-
}
189+
func (b *BaseProverTask) applyUniversal(schema *coordinatorType.GetTaskSchema) (*coordinatorType.GetTaskSchema, []byte, error) {
197190

198191
ok, uTaskData, metadata, _ := libzkp.GenerateUniversalTask(schema.TaskType, schema.TaskData, schema.HardForkName)
199192
if !ok {
200-
return nil, fmt.Errorf("can not generate universal task, see coordinator log for the reason")
193+
return nil, nil, fmt.Errorf("can not generate universal task, see coordinator log for the reason")
201194
}
202195

203-
cacheData := CachedTaskData{
204-
MetaData: metadata,
205-
UTaskData: uTaskData,
206-
}
207-
b.taskCache.Add(schema.TaskID, &cacheData)
208196
schema.TaskData = uTaskData
209-
return schema, nil
197+
return schema, []byte(metadata), nil
210198
}
211199

212200
func newGetTaskCounterVec(factory promauto.Factory, taskType string) *prometheus.CounterVec {

coordinator/internal/logic/provertask/task_cache.go

Lines changed: 0 additions & 56 deletions
This file was deleted.

0 commit comments

Comments
 (0)