-
Notifications
You must be signed in to change notification settings - Fork 4
feat: Add batch extender + persistent fixes #45
Changes from all commits
9933ad8
13dfccd
270e906
4218fbf
e329580
92c5b7f
84210c3
01a97b1
d24fde9
49c27fa
15e3e3c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this file should have testing to cover all functions |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,140 @@ | ||
| package sequencing | ||
|
|
||
| import ( | ||
| "sync" | ||
|
|
||
| "github.com/dgraph-io/badger/v4" | ||
| "github.com/rollkit/go-sequencing" | ||
| ) | ||
|
|
||
| // BatchQueue ... | ||
| type BatchQueue struct { | ||
| queue []sequencing.Batch | ||
| mu sync.Mutex | ||
| } | ||
|
|
||
| // NewBatchQueue creates a new BatchQueue | ||
| func NewBatchQueue() *BatchQueue { | ||
| return &BatchQueue{ | ||
| queue: make([]sequencing.Batch, 0), | ||
| } | ||
| } | ||
|
|
||
| // AddBatch adds a new batch to the queue | ||
| func (bq *BatchQueue) AddBatch(batch sequencing.Batch, db *badger.DB) error { | ||
| bq.mu.Lock() | ||
| bq.queue = append(bq.queue, batch) | ||
| bq.mu.Unlock() | ||
|
|
||
|
Comment on lines
+25
to
+28
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wouldn't it make sense to perform the batch validation steps before adding the batch to the queue? If we can't Hash or Marshal the batch, I would think that we should reject it from the queue. |
||
| // Get the hash and bytes of the batch | ||
| h, err := batch.Hash() | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Marshal the batch | ||
| batchBytes, err := batch.Marshal() | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Store the batch in BadgerDB | ||
| err = db.Update(func(txn *badger.Txn) error { | ||
| key := append(keyPrefixBatch, h...) | ||
| return txn.Set(key, batchBytes) | ||
| }) | ||
| return err | ||
| } | ||
|
|
||
| // AddBatchToTheTop adds a new batch to the queue, at index 0 | ||
| func (bq *BatchQueue) AddBatchToTheTop(batch sequencing.Batch, db *badger.DB) error { | ||
| bq.mu.Lock() | ||
| bq.queue = append([]sequencing.Batch{batch}, bq.queue...) | ||
| bq.mu.Unlock() | ||
|
Comment on lines
+51
to
+53
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same comment about order of the function |
||
|
|
||
| // Get the hash and bytes of the batch | ||
| h, err := batch.Hash() | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Marshal the batch | ||
| batchBytes, err := batch.Marshal() | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Store the batch in BadgerDB | ||
| err = db.Update(func(txn *badger.Txn) error { | ||
| key := append(keyPrefixBatch, h...) | ||
| return txn.Set(key, batchBytes) | ||
| }) | ||
| return err | ||
| } | ||
|
|
||
| // Next extracts a batch of transactions from the queue | ||
| func (bq *BatchQueue) Next(db *badger.DB) (*sequencing.Batch, error) { | ||
| bq.mu.Lock() | ||
| defer bq.mu.Unlock() | ||
| if len(bq.queue) == 0 { | ||
| return &sequencing.Batch{Transactions: nil}, nil | ||
| } | ||
| batch := bq.queue[0] | ||
| bq.queue = bq.queue[1:] | ||
|
|
||
| h, err := batch.Hash() | ||
| if err != nil { | ||
| return &sequencing.Batch{Transactions: nil}, err | ||
| } | ||
|
Comment on lines
+85
to
+88
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. similar comments here. if we are validating the batch before it enters the queue, then we should not be hitting an error here unless there was some sort of data corruption event. So having the checks here is still valid. However, before returning on error, we should also be removing the batch from the database, otherwise the database would get bloated with corrupt batches that aren't in the queue. |
||
|
|
||
| // Remove the batch from BadgerDB after processing | ||
| err = db.Update(func(txn *badger.Txn) error { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. to address the previous comment, this could be called in a defer function after pull the batch out of the queue. The error can just be logged for debug purposes. |
||
| // Get the batch to ensure it exists in the DB before deleting | ||
| key := append(keyPrefixBatch, h...) | ||
| _, err := txn.Get(key) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| // Delete the batch from BadgerDB | ||
| return txn.Delete(key) | ||
| }) | ||
| if err != nil { | ||
| return &sequencing.Batch{Transactions: nil}, err | ||
| } | ||
|
Comment on lines
+101
to
+103
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we should nil out the returned batch on error deleting it from the database. |
||
|
|
||
| return &batch, nil | ||
| } | ||
|
|
||
| // LoadFromDB reloads all batches from BadgerDB into the in-memory queue after a crash or restart. | ||
| func (bq *BatchQueue) LoadFromDB(db *badger.DB) error { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function is a reason to clean up the handling of bad batches, otherwise we keep loading the bad batches back into memory because they aren't being deleted from the database |
||
| bq.mu.Lock() | ||
| defer bq.mu.Unlock() | ||
|
|
||
| err := db.View(func(txn *badger.Txn) error { | ||
| // Create an iterator to go through all batches stored in BadgerDB | ||
| opts := badger.DefaultIteratorOptions | ||
| opts.Prefix = keyPrefixBatch | ||
| it := txn.NewIterator(opts) | ||
| defer it.Close() | ||
|
|
||
| for it.Rewind(); it.Valid(); it.Next() { | ||
| item := it.Item() | ||
| err := item.Value(func(val []byte) error { | ||
| var batch sequencing.Batch | ||
| // Unmarshal the batch bytes and add them to the in-memory queue | ||
| err := batch.Unmarshal(val) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| bq.queue = append(bq.queue, batch) | ||
| return nil | ||
| }) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
| return nil | ||
| }) | ||
|
|
||
| return err | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.