Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0dfa72b
added contexts to Celestia DataAvailabilityLayerClient
Nov 22, 2022
283f715
added context to mock, grpc, and celestia
Nov 22, 2022
f6d3372
pass contexts into DataAvailabilityLayerClient from BlockManager methods
Nov 22, 2022
b87fee3
Update da/da.go
S1nus Nov 22, 2022
d6c9ee8
Update da/da.go
S1nus Nov 22, 2022
fc09ed7
Update da/celestia/celestia.go
S1nus Nov 22, 2022
5aad473
Update da/celestia/celestia.go
S1nus Nov 22, 2022
5f96859
Update da/celestia/celestia.go
S1nus Nov 22, 2022
00a83a6
Update da/da.go
S1nus Nov 22, 2022
d61daf8
renamed context to ctx
Nov 22, 2022
660fff8
added context to da tests
Nov 23, 2022
5d211a6
gofmt repo
Nov 23, 2022
a199744
ran goimports
Nov 23, 2022
28377e5
Revert "ran goimports"- don't want to mess with tendermint files.
Nov 30, 2022
de72520
goimports on da/da.go
Nov 30, 2022
fc9c50b
reverted clist_test.go
Nov 30, 2022
945ea59
unformatted clist
Nov 30, 2022
e80290f
Merge branch 'main' into connor/contexts
S1nus Nov 30, 2022
1985eea
renamed context to ctx
Dec 12, 2022
64811b3
use context.Background() for da_test.go
Dec 12, 2022
111e2c1
da_test context
Dec 12, 2022
c70fa56
Merge branch 'main' into connor/contexts
S1nus Dec 12, 2022
1d9de66
gofmt da
Dec 13, 2022
aa4d548
Merge branch 'main' into connor/contexts
S1nus Dec 13, 2022
ee91772
switched to background context in doTestDALC
Dec 15, 2022
4b36550
Merge branch 'connor/contexts' of github.com:celestiaorg/rollmint int…
Dec 15, 2022
f51c3c7
Merge branch 'main' into connor/contexts
S1nus Dec 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func (m *Manager) RetrieveLoop(ctx context.Context) {
for {
daHeight := atomic.LoadUint64(&m.daHeight)
m.logger.Debug("retrieve", "daHeight", daHeight)
err := m.processNextDABlock()
err := m.processNextDABlock(ctx)
if err != nil {
m.logger.Error("failed to retrieve block from DALC", "daHeight", daHeight, "errors", err.Error())
break
Expand All @@ -358,15 +358,15 @@ func (m *Manager) RetrieveLoop(ctx context.Context) {
}
}

func (m *Manager) processNextDABlock() error {
func (m *Manager) processNextDABlock(ctx context.Context) error {
// TODO(tzdybal): extract configuration option
maxRetries := 10
daHeight := atomic.LoadUint64(&m.daHeight)

var err error
m.logger.Debug("trying to retrieve block from DA", "daHeight", daHeight)
for r := 0; r < maxRetries; r++ {
blockResp, fetchErr := m.fetchBlock(daHeight)
blockResp, fetchErr := m.fetchBlock(ctx, daHeight)
if fetchErr != nil {
err = multierr.Append(err, fetchErr)
time.Sleep(100 * time.Millisecond)
Expand All @@ -381,9 +381,9 @@ func (m *Manager) processNextDABlock() error {
return err
}

func (m *Manager) fetchBlock(daHeight uint64) (da.ResultRetrieveBlocks, error) {
func (m *Manager) fetchBlock(ctx context.Context, daHeight uint64) (da.ResultRetrieveBlocks, error) {
var err error
blockRes := m.retriever.RetrieveBlocks(daHeight)
blockRes := m.retriever.RetrieveBlocks(ctx, daHeight)
switch blockRes.Code {
case da.StatusError:
err = fmt.Errorf("failed to retrieve block: %s", blockRes.Message)
Expand Down Expand Up @@ -534,7 +534,7 @@ func (m *Manager) submitBlockToDA(ctx context.Context, block *types.Block) error
submitted := false
backoff := initialBackoff
for attempt := 1; ctx.Err() == nil && !submitted && attempt <= maxSubmitAttempts; attempt++ {
res := m.dalc.SubmitBlock(block)
res := m.dalc.SubmitBlock(ctx, block)
if res.Code == da.StatusSuccess {
m.logger.Info("successfully submitted rollmint block to DA layer", "rollmintHeight", block.Header.Height, "daHeight", res.DAHeight)
submitted = true
Expand Down
12 changes: 6 additions & 6 deletions da/celestia/celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (c *DataAvailabilityLayerClient) Stop() error {
}

// SubmitBlock submits a block to DA layer.
func (c *DataAvailabilityLayerClient) SubmitBlock(block *types.Block) da.ResultSubmitBlock {
func (c *DataAvailabilityLayerClient) SubmitBlock(ctx context.Context, block *types.Block) da.ResultSubmitBlock {
blob, err := block.MarshalBinary()
if err != nil {
return da.ResultSubmitBlock{
Expand All @@ -73,7 +73,7 @@ func (c *DataAvailabilityLayerClient) SubmitBlock(block *types.Block) da.ResultS
}
}

txResponse, err := c.client.SubmitPFD(context.TODO(), c.namespaceID, blob, c.config.GasLimit)
txResponse, err := c.client.SubmitPFD(ctx, c.namespaceID, blob, c.config.GasLimit)

if err != nil {
return da.ResultSubmitBlock{
Expand Down Expand Up @@ -103,8 +103,8 @@ func (c *DataAvailabilityLayerClient) SubmitBlock(block *types.Block) da.ResultS
}

// CheckBlockAvailability queries DA layer to check data availability of block at given height.
func (c *DataAvailabilityLayerClient) CheckBlockAvailability(dataLayerHeight uint64) da.ResultCheckBlock {
shares, err := c.client.NamespacedShares(context.TODO(), c.namespaceID, dataLayerHeight)
func (c *DataAvailabilityLayerClient) CheckBlockAvailability(ctx context.Context, dataLayerHeight uint64) da.ResultCheckBlock {
shares, err := c.client.NamespacedShares(ctx, c.namespaceID, dataLayerHeight)
if err != nil {
return da.ResultCheckBlock{
BaseResult: da.BaseResult{
Expand All @@ -124,8 +124,8 @@ func (c *DataAvailabilityLayerClient) CheckBlockAvailability(dataLayerHeight uin
}

// RetrieveBlocks gets a batch of blocks from DA layer.
func (c *DataAvailabilityLayerClient) RetrieveBlocks(dataLayerHeight uint64) da.ResultRetrieveBlocks {
data, err := c.client.NamespacedData(context.TODO(), c.namespaceID, dataLayerHeight)
func (c *DataAvailabilityLayerClient) RetrieveBlocks(ctx context.Context, dataLayerHeight uint64) da.ResultRetrieveBlocks {
data, err := c.client.NamespacedData(ctx, c.namespaceID, dataLayerHeight)
if err != nil {
return da.ResultRetrieveBlocks{
BaseResult: da.BaseResult{
Expand Down
6 changes: 3 additions & 3 deletions da/celestia/mock/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (s *Server) submit(w http.ResponseWriter, r *http.Request) {
return
}

res := s.mock.SubmitBlock(&block)
res := s.mock.SubmitBlock(r.Context(), &block)
code := 0
if res.Code != da.StatusSuccess {
code = 3
Expand All @@ -118,7 +118,7 @@ func (s *Server) shares(w http.ResponseWriter, r *http.Request) {
return
}

res := s.mock.RetrieveBlocks(height)
res := s.mock.RetrieveBlocks(r.Context(), height)
if res.Code != da.StatusSuccess {
s.writeError(w, errors.New(res.Message))
return
Expand Down Expand Up @@ -161,7 +161,7 @@ func (s *Server) data(w http.ResponseWriter, r *http.Request) {
return
}

res := s.mock.RetrieveBlocks(height)
res := s.mock.RetrieveBlocks(r.Context(), height)
if res.Code != da.StatusSuccess {
s.writeError(w, errors.New(res.Message))
return
Expand Down
8 changes: 5 additions & 3 deletions da/da.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package da

import (
"context"

"github.com/celestiaorg/rollmint/log"
"github.com/celestiaorg/rollmint/store"
"github.com/celestiaorg/rollmint/types"
Expand Down Expand Up @@ -69,15 +71,15 @@ type DataAvailabilityLayerClient interface {
// SubmitBlock submits the passed in block to the DA layer.
// This should create a transaction which (potentially)
// triggers a state transition in the DA layer.
SubmitBlock(block *types.Block) ResultSubmitBlock
SubmitBlock(ctx context.Context, block *types.Block) ResultSubmitBlock

// CheckBlockAvailability queries DA layer to check data availability of block corresponding at given height.
CheckBlockAvailability(dataLayerHeight uint64) ResultCheckBlock
CheckBlockAvailability(ctx context.Context, dataLayerHeight uint64) ResultCheckBlock
}

// BlockRetriever is additional interface that can be implemented by Data Availability Layer Client that is able to retrieve
// block data from DA layer. This gives the ability to use it for block synchronization.
type BlockRetriever interface {
// RetrieveBlocks returns blocks at given data layer height from data availability layer.
RetrieveBlocks(dataLayerHeight uint64) ResultRetrieveBlocks
RetrieveBlocks(ctx context.Context, dataLayerHeight uint64) ResultRetrieveBlocks
}
12 changes: 6 additions & 6 deletions da/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ func (d *DataAvailabilityLayerClient) Stop() error {
}

// SubmitBlock proxies SubmitBlock request to gRPC server.
func (d *DataAvailabilityLayerClient) SubmitBlock(block *types.Block) da.ResultSubmitBlock {
resp, err := d.client.SubmitBlock(context.TODO(), &dalc.SubmitBlockRequest{Block: block.ToProto()})
func (d *DataAvailabilityLayerClient) SubmitBlock(ctx context.Context, block *types.Block) da.ResultSubmitBlock {
resp, err := d.client.SubmitBlock(ctx, &dalc.SubmitBlockRequest{Block: block.ToProto()})
if err != nil {
return da.ResultSubmitBlock{
BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error()},
Expand All @@ -91,8 +91,8 @@ func (d *DataAvailabilityLayerClient) SubmitBlock(block *types.Block) da.ResultS
}

// CheckBlockAvailability proxies CheckBlockAvailability request to gRPC server.
func (d *DataAvailabilityLayerClient) CheckBlockAvailability(daHeight uint64) da.ResultCheckBlock {
resp, err := d.client.CheckBlockAvailability(context.TODO(), &dalc.CheckBlockAvailabilityRequest{DAHeight: daHeight})
func (d *DataAvailabilityLayerClient) CheckBlockAvailability(ctx context.Context, daHeight uint64) da.ResultCheckBlock {
resp, err := d.client.CheckBlockAvailability(ctx, &dalc.CheckBlockAvailabilityRequest{DAHeight: daHeight})
if err != nil {
return da.ResultCheckBlock{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error()}}
}
Expand All @@ -103,8 +103,8 @@ func (d *DataAvailabilityLayerClient) CheckBlockAvailability(daHeight uint64) da
}

// RetrieveBlocks proxies RetrieveBlocks request to gRPC server.
func (d *DataAvailabilityLayerClient) RetrieveBlocks(daHeight uint64) da.ResultRetrieveBlocks {
resp, err := d.client.RetrieveBlocks(context.TODO(), &dalc.RetrieveBlocksRequest{DAHeight: daHeight})
func (d *DataAvailabilityLayerClient) RetrieveBlocks(ctx context.Context, daHeight uint64) da.ResultRetrieveBlocks {
resp, err := d.client.RetrieveBlocks(ctx, &dalc.RetrieveBlocksRequest{DAHeight: daHeight})
if err != nil {
return da.ResultRetrieveBlocks{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error()}}
}
Expand Down
12 changes: 6 additions & 6 deletions da/grpc/mockserv/mockserv.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ type mockImpl struct {
mock mock.DataAvailabilityLayerClient
}

func (m *mockImpl) SubmitBlock(_ context.Context, request *dalc.SubmitBlockRequest) (*dalc.SubmitBlockResponse, error) {
func (m *mockImpl) SubmitBlock(ctx context.Context, request *dalc.SubmitBlockRequest) (*dalc.SubmitBlockResponse, error) {
var b types.Block
err := b.FromProto(request.Block)
if err != nil {
return nil, err
}
resp := m.mock.SubmitBlock(&b)
resp := m.mock.SubmitBlock(ctx, &b)
return &dalc.SubmitBlockResponse{
Result: &dalc.DAResponse{
Code: dalc.StatusCode(resp.Code),
Expand All @@ -52,8 +52,8 @@ func (m *mockImpl) SubmitBlock(_ context.Context, request *dalc.SubmitBlockReque
}, nil
}

func (m *mockImpl) CheckBlockAvailability(_ context.Context, request *dalc.CheckBlockAvailabilityRequest) (*dalc.CheckBlockAvailabilityResponse, error) {
resp := m.mock.CheckBlockAvailability(request.DAHeight)
func (m *mockImpl) CheckBlockAvailability(ctx context.Context, request *dalc.CheckBlockAvailabilityRequest) (*dalc.CheckBlockAvailabilityResponse, error) {
resp := m.mock.CheckBlockAvailability(ctx, request.DAHeight)
return &dalc.CheckBlockAvailabilityResponse{
Result: &dalc.DAResponse{
Code: dalc.StatusCode(resp.Code),
Expand All @@ -63,8 +63,8 @@ func (m *mockImpl) CheckBlockAvailability(_ context.Context, request *dalc.Check
}, nil
}

func (m *mockImpl) RetrieveBlocks(context context.Context, request *dalc.RetrieveBlocksRequest) (*dalc.RetrieveBlocksResponse, error) {
resp := m.mock.RetrieveBlocks(request.DAHeight)
func (m *mockImpl) RetrieveBlocks(ctx context.Context, request *dalc.RetrieveBlocksRequest) (*dalc.RetrieveBlocksResponse, error) {
resp := m.mock.RetrieveBlocks(ctx, request.DAHeight)
blocks := make([]*rollmint.Block, len(resp.Blocks))
for i := range resp.Blocks {
blocks[i] = resp.Blocks[i].ToProto()
Expand Down
9 changes: 5 additions & 4 deletions da/mock/mock.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mock

import (
"context"
"encoding/binary"
"math/rand"
"sync/atomic"
Expand Down Expand Up @@ -68,7 +69,7 @@ func (m *DataAvailabilityLayerClient) Stop() error {
// SubmitBlock submits the passed in block to the DA layer.
// This should create a transaction which (potentially)
// triggers a state transition in the DA layer.
func (m *DataAvailabilityLayerClient) SubmitBlock(block *types.Block) da.ResultSubmitBlock {
func (m *DataAvailabilityLayerClient) SubmitBlock(ctx context.Context, block *types.Block) da.ResultSubmitBlock {
daHeight := atomic.LoadUint64(&m.daHeight)
m.logger.Debug("Submitting block to DA layer!", "height", block.Header.Height, "dataLayerHeight", daHeight)

Expand Down Expand Up @@ -97,13 +98,13 @@ func (m *DataAvailabilityLayerClient) SubmitBlock(block *types.Block) da.ResultS
}

// CheckBlockAvailability queries DA layer to check data availability of block corresponding to given header.
func (m *DataAvailabilityLayerClient) CheckBlockAvailability(daHeight uint64) da.ResultCheckBlock {
blocksRes := m.RetrieveBlocks(daHeight)
func (m *DataAvailabilityLayerClient) CheckBlockAvailability(ctx context.Context, daHeight uint64) da.ResultCheckBlock {
blocksRes := m.RetrieveBlocks(ctx, daHeight)
return da.ResultCheckBlock{BaseResult: da.BaseResult{Code: blocksRes.Code}, DataAvailable: len(blocksRes.Blocks) > 0}
}

// RetrieveBlocks returns block at given height from data availability layer.
func (m *DataAvailabilityLayerClient) RetrieveBlocks(daHeight uint64) da.ResultRetrieveBlocks {
func (m *DataAvailabilityLayerClient) RetrieveBlocks(ctx context.Context, daHeight uint64) da.ResultRetrieveBlocks {
if daHeight >= atomic.LoadUint64(&m.daHeight) {
return da.ResultRetrieveBlocks{BaseResult: da.BaseResult{Code: da.StatusError, Message: "block not found"}}
}
Expand Down
19 changes: 11 additions & 8 deletions da/test/da_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package test

import (
"context"
"encoding/json"
"math/rand"
"net"
Expand Down Expand Up @@ -72,6 +73,7 @@ func TestDALC(t *testing.T) {
func doTestDALC(t *testing.T, dalc da.DataAvailabilityLayerClient) {
require := require.New(t)
assert := assert.New(t)
ctx := context.Background()

// mock DALC will advance block height every 100ms
conf := []byte{}
Expand Down Expand Up @@ -99,27 +101,27 @@ func doTestDALC(t *testing.T, dalc da.DataAvailabilityLayerClient) {
b1 := getRandomBlock(1, 10)
b2 := getRandomBlock(2, 10)

resp := dalc.SubmitBlock(b1)
resp := dalc.SubmitBlock(ctx, b1)
h1 := resp.DAHeight
assert.Equal(da.StatusSuccess, resp.Code)

resp = dalc.SubmitBlock(b2)
resp = dalc.SubmitBlock(ctx, b2)
h2 := resp.DAHeight
assert.Equal(da.StatusSuccess, resp.Code)

// wait a bit more than mockDaBlockTime, so rollmint blocks can be "included" in mock block
time.Sleep(mockDaBlockTime + 20*time.Millisecond)

check := dalc.CheckBlockAvailability(h1)
check := dalc.CheckBlockAvailability(ctx, h1)
assert.Equal(da.StatusSuccess, check.Code)
assert.True(check.DataAvailable)

check = dalc.CheckBlockAvailability(h2)
check = dalc.CheckBlockAvailability(ctx, h2)
assert.Equal(da.StatusSuccess, check.Code)
assert.True(check.DataAvailable)

// this height should not be used by DALC
check = dalc.CheckBlockAvailability(h1 - 1)
check = dalc.CheckBlockAvailability(ctx, h1-1)
assert.Equal(da.StatusSuccess, check.Code)
assert.False(check.DataAvailable)
}
Expand Down Expand Up @@ -173,6 +175,7 @@ func startMockCelestiaNodeServer(t *testing.T) *cmock.Server {
}

func doTestRetrieve(t *testing.T, dalc da.DataAvailabilityLayerClient) {
ctx := context.Background()
require := require.New(t)
assert := assert.New(t)

Expand Down Expand Up @@ -204,7 +207,7 @@ func doTestRetrieve(t *testing.T, dalc da.DataAvailabilityLayerClient) {

for i := uint64(0); i < 100; i++ {
b := getRandomBlock(i, rand.Int()%20)
resp := dalc.SubmitBlock(b)
resp := dalc.SubmitBlock(ctx, b)
assert.Equal(da.StatusSuccess, resp.Code, resp.Message)
time.Sleep(time.Duration(rand.Int63() % mockDaBlockTime.Milliseconds()))

Expand All @@ -217,14 +220,14 @@ func doTestRetrieve(t *testing.T, dalc da.DataAvailabilityLayerClient) {

for h, cnt := range countAtHeight {
t.Log("Retrieving block, DA Height", h)
ret := retriever.RetrieveBlocks(h)
ret := retriever.RetrieveBlocks(ctx, h)
assert.Equal(da.StatusSuccess, ret.Code, ret.Message)
require.NotEmpty(ret.Blocks, h)
assert.Len(ret.Blocks, cnt, h)
}

for b, h := range blocks {
ret := retriever.RetrieveBlocks(h)
ret := retriever.RetrieveBlocks(ctx, h)
assert.Equal(da.StatusSuccess, ret.Code, h)
require.NotEmpty(ret.Blocks, h)
assert.Contains(ret.Blocks, b, h)
Expand Down