Skip to content

Commit cb691fc

Browse files
committed
support beacon node client as blob provider
1 parent 0b2fe3b commit cb691fc

File tree

10 files changed

+215
-57
lines changed

10 files changed

+215
-57
lines changed

cmd/geth/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ var (
175175
utils.DASnapshotFileFlag,
176176
utils.DABlockNativeAPIEndpointFlag,
177177
utils.DABlobScanAPIEndpointFlag,
178+
utils.DABeaconNodeAPIEndpointFlag,
178179
}
179180

180181
rpcFlags = []cli.Flag{

cmd/utils/flags.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -884,12 +884,15 @@ var (
884884
DABlobScanAPIEndpointFlag = cli.StringFlag{
885885
Name: "da.blob.blobscan",
886886
Usage: "BlobScan blob api endpoint",
887-
Value: ethconfig.Defaults.DA.BlobScanAPIEndpoint,
888887
}
889888
DABlockNativeAPIEndpointFlag = cli.StringFlag{
890889
Name: "da.blob.blocknative",
891890
Usage: "BlockNative blob api endpoint",
892891
}
892+
DABeaconNodeAPIEndpointFlag = cli.StringFlag{
893+
Name: "da.blob.blobscan",
894+
Usage: "BlobScan blob api endpoint",
895+
}
893896
)
894897

895898
// MakeDataDir retrieves the currently requested data directory, terminating
@@ -1625,6 +1628,9 @@ func setDA(ctx *cli.Context, cfg *ethconfig.Config) {
16251628
if ctx.GlobalIsSet(DABlockNativeAPIEndpointFlag.Name) {
16261629
cfg.DA.BlockNativeAPIEndpoint = ctx.GlobalString(DABlockNativeAPIEndpointFlag.Name)
16271630
}
1631+
if ctx.GlobalIsSet(DABeaconNodeAPIEndpointFlag.Name) {
1632+
cfg.DA.BeaconNodeAPIEndpoint = ctx.GlobalString(DABeaconNodeAPIEndpointFlag.Name)
1633+
}
16281634
}
16291635
}
16301636

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
package blob_client
2+
3+
import (
4+
"context"
5+
"crypto/sha256"
6+
"encoding/hex"
7+
"encoding/json"
8+
"fmt"
9+
"io"
10+
"net/http"
11+
"path"
12+
"strconv"
13+
14+
"github.com/scroll-tech/go-ethereum/common"
15+
"github.com/scroll-tech/go-ethereum/crypto/kzg4844"
16+
"github.com/scroll-tech/go-ethereum/rollup/rollup_sync_service"
17+
)
18+
19+
type BeaconNodeClient struct {
20+
apiEndpoint string
21+
l1Client *rollup_sync_service.L1Client
22+
genesisTime uint64
23+
secondsPerSlot uint64
24+
}
25+
26+
var (
27+
beaconNodeGenesisEndpoint = "/eth/v1/beacon/genesis"
28+
beaconNodeSpecEndpoint = "/eth/v1/config/spec"
29+
beaconNodeBlobEndpoint = "/eth/v1/beacon/blob_sidecars"
30+
)
31+
32+
func NewBeaconNodeClient(apiEndpoint string, l1Client *rollup_sync_service.L1Client) (*BeaconNodeClient, error) {
33+
// get genesis time
34+
genesisPath := path.Join(apiEndpoint, beaconNodeGenesisEndpoint)
35+
resp, err := http.Get(genesisPath)
36+
if err != nil {
37+
return nil, fmt.Errorf("cannot do request, err: %w", err)
38+
}
39+
defer resp.Body.Close()
40+
if resp.StatusCode != http.StatusOK {
41+
body, _ := io.ReadAll(resp.Body)
42+
bodyStr := string(body)
43+
return nil, fmt.Errorf("beacon node request failed, status: %s, body: %s", resp.Status, bodyStr)
44+
}
45+
var genesisResp GenesisResp
46+
err = json.NewDecoder(resp.Body).Decode(&genesisResp)
47+
if err != nil {
48+
return nil, fmt.Errorf("failed to decode result into struct, err: %w", err)
49+
}
50+
genesisTime, err := strconv.ParseUint(genesisResp.Data.GenesisTime, 10, 64)
51+
if err != nil {
52+
return nil, fmt.Errorf("failed to decode genesis time %s, err: %w", genesisResp.Data.GenesisTime, err)
53+
}
54+
55+
// get seconds per slot from spec
56+
specPath := path.Join(apiEndpoint, beaconNodeSpecEndpoint)
57+
resp, err = http.Get(specPath)
58+
if err != nil {
59+
return nil, fmt.Errorf("cannot do request, err: %w", err)
60+
}
61+
defer resp.Body.Close()
62+
if resp.StatusCode != http.StatusOK {
63+
body, _ := io.ReadAll(resp.Body)
64+
bodyStr := string(body)
65+
return nil, fmt.Errorf("beacon node request failed, status: %s, body: %s", resp.Status, bodyStr)
66+
}
67+
var specResp SpecResp
68+
err = json.NewDecoder(resp.Body).Decode(&specResp)
69+
if err != nil {
70+
return nil, fmt.Errorf("failed to decode result into struct, err: %w", err)
71+
}
72+
secondsPerSlot, err := strconv.ParseUint(specResp.Data.SecondsPerSlot, 10, 64)
73+
if err != nil {
74+
return nil, fmt.Errorf("failed to decode seconds per slot %s, err: %w", specResp.Data.SecondsPerSlot, err)
75+
}
76+
if secondsPerSlot == 0 {
77+
return nil, fmt.Errorf("failed to make new BeaconNodeClient, secondsPerSlot is 0")
78+
}
79+
80+
return &BeaconNodeClient{
81+
apiEndpoint: apiEndpoint,
82+
l1Client: l1Client,
83+
genesisTime: genesisTime,
84+
secondsPerSlot: secondsPerSlot,
85+
}, nil
86+
}
87+
88+
func (c *BeaconNodeClient) GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error) {
89+
// get block timestamp to calculate slot
90+
header, err := c.l1Client.GetHeaderByNumber(blockNumber)
91+
if err != nil {
92+
return nil, fmt.Errorf("failed to get header by number, err: %w", err)
93+
}
94+
slot := (header.Time - c.genesisTime) / c.secondsPerSlot
95+
96+
// get blob sidecar for slot
97+
blobSidecarPath := path.Join(c.apiEndpoint, beaconNodeBlobEndpoint, fmt.Sprintf("%d", slot))
98+
resp, err := http.Get(blobSidecarPath)
99+
if err != nil {
100+
return nil, fmt.Errorf("cannot do request, err: %w", err)
101+
}
102+
defer resp.Body.Close()
103+
if resp.StatusCode != http.StatusOK {
104+
body, _ := io.ReadAll(resp.Body)
105+
bodyStr := string(body)
106+
return nil, fmt.Errorf("beacon node request failed, status: %s, body: %s", resp.Status, bodyStr)
107+
}
108+
var blobSidecarResp BlobSidecarResp
109+
err = json.NewDecoder(resp.Body).Decode(&blobSidecarResp)
110+
if err != nil {
111+
return nil, fmt.Errorf("failed to decode result into struct, err: %w", err)
112+
}
113+
114+
// find blob with desired versionedHash
115+
for _, blob := range blobSidecarResp.Data {
116+
// calculate blob hash from commitment and check it with desired
117+
commitmentBytes, err := hex.DecodeString(blob.KzgCommitment[2:])
118+
if err != nil {
119+
return nil, fmt.Errorf("failed to decode data to bytes, err: %w", err)
120+
}
121+
if len(commitmentBytes) != lenKzgCommitment {
122+
return nil, fmt.Errorf("len of kzg commitment is not correct, expected: %d, got: %d", lenKzgCommitment, len(commitmentBytes))
123+
}
124+
commitment := kzg4844.Commitment(commitmentBytes)
125+
blobVersionedHash := kzg4844.CalcBlobHashV1(sha256.New(), &commitment)
126+
if blobVersionedHash == versionedHash {
127+
// found desired blob
128+
blobBytes, err := hex.DecodeString(blob.Blob[2:])
129+
if err != nil {
130+
return nil, fmt.Errorf("failed to decode data to bytes, err: %w", err)
131+
}
132+
if len(blobBytes) != lenBlobBytes {
133+
return nil, fmt.Errorf("len of blob data is not correct, expected: %d, got: %d", lenBlobBytes, len(blobBytes))
134+
}
135+
blob := kzg4844.Blob(blobBytes)
136+
return &blob, nil
137+
}
138+
}
139+
140+
return nil, fmt.Errorf("missing blob %v in slot %d, block number %d", versionedHash, slot, blockNumber)
141+
}
142+
143+
type GenesisResp struct {
144+
Data struct {
145+
GenesisTime string `json:"genesis_time"`
146+
} `json:"data"`
147+
}
148+
149+
type SpecResp struct {
150+
Data struct {
151+
SecondsPerSlot string `json:"SECONDS_PER_SLOT"`
152+
} `json:"data"`
153+
}
154+
155+
type BlobSidecarResp struct {
156+
Data []struct {
157+
Index string `json:"index"`
158+
Blob string `json:"blob"`
159+
KzgCommitment string `json:"kzg_commitment"`
160+
KzgProof string `json:"kzg_proof"`
161+
SignedBlockHeader struct {
162+
Message struct {
163+
Slot string `json:"slot"`
164+
ProposerIndex string `json:"proposer_index"`
165+
ParentRoot string `json:"parent_root"`
166+
StateRoot string `json:"state_root"`
167+
BodyRoot string `json:"body_root"`
168+
} `json:"message"`
169+
Signature string `json:"signature"`
170+
} `json:"signed_block_header"`
171+
KzgCommitmentInclusionProof []string `json:"kzg_commitment_inclusion_proof"`
172+
} `json:"data"`
173+
}

rollup/da_syncer/blob_client/blob_client.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ import (
88
)
99

1010
const (
11-
okStatusCode int = 200
12-
lenBlobBytes int = 131072
11+
lenBlobBytes int = 131072
12+
lenKzgCommitment int = 48
1313
)
1414

1515
type BlobClient interface {
16-
GetBlobByVersionedHash(ctx context.Context, versionedHash common.Hash) (*kzg4844.Blob, error)
16+
GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error)
1717
}

rollup/da_syncer/blob_client/blob_client_list.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,13 @@ func NewBlobClientList(blobClients ...BlobClient) *BlobClientList {
2323
}
2424
}
2525

26-
func (c *BlobClientList) GetBlobByVersionedHash(ctx context.Context, versionedHash common.Hash) (*kzg4844.Blob, error) {
26+
func (c *BlobClientList) GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error) {
2727
if len(c.list) == 0 {
2828
return nil, fmt.Errorf("BlobClientList.GetBlobByVersionedHash: list of BlobClients is empty")
2929
}
3030

3131
for i := 0; i < len(c.list); i++ {
32-
blob, err := c.list[c.nextPos()].GetBlobByVersionedHash(ctx, versionedHash)
32+
blob, err := c.list[c.nextPos()].GetBlobByVersionedHashAndBlockNumber(ctx, versionedHash, blockNumber)
3333
if err == nil {
3434
return blob, nil
3535
}

rollup/da_syncer/blob_client/blob_scan_client.go

Lines changed: 4 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func NewBlobScanClient(apiEndpoint string) *BlobScanClient {
2424
}
2525
}
2626

27-
func (c *BlobScanClient) GetBlobByVersionedHash(ctx context.Context, versionedHash common.Hash) (*kzg4844.Blob, error) {
27+
func (c *BlobScanClient) GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error) {
2828
// blobscan api docs https://api.blobscan.com/#/blobs/blob-getByBlobId
2929
path, err := url.JoinPath(c.apiEndpoint, versionedHash.String())
3030
if err != nil {
@@ -40,8 +40,8 @@ func (c *BlobScanClient) GetBlobByVersionedHash(ctx context.Context, versionedHa
4040
return nil, fmt.Errorf("cannot do request, err: %w", err)
4141
}
4242
defer resp.Body.Close()
43-
if resp.StatusCode != okStatusCode {
44-
if resp.StatusCode == 404 {
43+
if resp.StatusCode != http.StatusOK {
44+
if resp.StatusCode == http.StatusNotFound {
4545
return nil, fmt.Errorf("no blob with versioned hash : %s", versionedHash.String())
4646
}
4747
var res ErrorRespBlobScan
@@ -69,44 +69,10 @@ func (c *BlobScanClient) GetBlobByVersionedHash(ctx context.Context, versionedHa
6969
}
7070

7171
type BlobRespBlobScan struct {
72-
Commitment string `json:"commitment"`
73-
Proof string `json:"proof"`
74-
Size int `json:"size"`
75-
VersionedHash string `json:"versionedHash"`
76-
Data string `json:"data"`
77-
DataStorageReferences []struct {
78-
BlobStorage string `json:"blobStorage"`
79-
DataReference string `json:"dataReference"`
80-
} `json:"dataStorageReferences"`
81-
Transactions []struct {
82-
Hash string `json:"hash"`
83-
Index int `json:"index"`
84-
Block struct {
85-
Number int `json:"number"`
86-
BlobGasUsed string `json:"blobGasUsed"`
87-
BlobAsCalldataGasUsed string `json:"blobAsCalldataGasUsed"`
88-
BlobGasPrice string `json:"blobGasPrice"`
89-
ExcessBlobGas string `json:"excessBlobGas"`
90-
Hash string `json:"hash"`
91-
Timestamp string `json:"timestamp"`
92-
Slot int `json:"slot"`
93-
} `json:"block"`
94-
From string `json:"from"`
95-
To string `json:"to"`
96-
MaxFeePerBlobGas string `json:"maxFeePerBlobGas"`
97-
BlobAsCalldataGasUsed string `json:"blobAsCalldataGasUsed"`
98-
Rollup string `json:"rollup"`
99-
BlobAsCalldataGasFee string `json:"blobAsCalldataGasFee"`
100-
BlobGasBaseFee string `json:"blobGasBaseFee"`
101-
BlobGasMaxFee string `json:"blobGasMaxFee"`
102-
BlobGasUsed string `json:"blobGasUsed"`
103-
} `json:"transactions"`
72+
Data string `json:"data"`
10473
}
10574

10675
type ErrorRespBlobScan struct {
10776
Message string `json:"message"`
10877
Code string `json:"code"`
109-
Issues []struct {
110-
Message string `json:"message"`
111-
} `json:"issues"`
11278
}

rollup/da_syncer/blob_client/block_native_client.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func NewBlockNativeClient(apiEndpoint string) *BlockNativeClient {
2222
}
2323
}
2424

25-
func (c *BlockNativeClient) GetBlobByVersionedHash(ctx context.Context, versionedHash common.Hash) (*kzg4844.Blob, error) {
25+
func (c *BlockNativeClient) GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error) {
2626
// blocknative api docs https://docs.blocknative.com/blocknative-data-archive/blob-archive
2727
path, err := url.JoinPath(c.apiEndpoint, versionedHash.String())
2828
if err != nil {
@@ -33,7 +33,7 @@ func (c *BlockNativeClient) GetBlobByVersionedHash(ctx context.Context, versione
3333
return nil, fmt.Errorf("cannot do request, err: %w", err)
3434
}
3535
defer resp.Body.Close()
36-
if resp.StatusCode != okStatusCode {
36+
if resp.StatusCode != http.StatusOK {
3737
var res ErrorRespBlockNative
3838
err = json.NewDecoder(resp.Body).Decode(&res)
3939
if err != nil {
@@ -59,12 +59,7 @@ func (c *BlockNativeClient) GetBlobByVersionedHash(ctx context.Context, versione
5959

6060
type BlobRespBlockNative struct {
6161
Blob struct {
62-
VersionedHash string `json:"versionedHash"`
63-
Commitment string `json:"commitment"`
64-
Proof string `json:"proof"`
65-
ZeroBytes int `json:"zeroBytes"`
66-
NonZeroBytes int `json:"nonZeroBytes"`
67-
Data string `json:"data"`
62+
Data string `json:"data"`
6863
} `json:"blob"`
6964
}
7065

rollup/da_syncer/da/commitV1.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func NewCommitBatchDAV1WithBlobDecodeFunc(ctx context.Context, db ethdb.Database
5555
return nil, fmt.Errorf("failed to fetch blob hash, err: %w", err)
5656
}
5757

58-
blob, err := blobClient.GetBlobByVersionedHash(ctx, versionedHash)
58+
blob, err := blobClient.GetBlobByVersionedHashAndBlockNumber(ctx, versionedHash, vLog.BlockNumber)
5959
if err != nil {
6060
return nil, fmt.Errorf("failed to fetch blob from blob client, err: %w", err)
6161
}

rollup/da_syncer/syncing_pipeline.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ type Config struct {
2525
SnapshotFilePath string // path to snapshot file
2626
BlobScanAPIEndpoint string // BlobScan blob api endpoint
2727
BlockNativeAPIEndpoint string // BlockNative blob api endpoint
28+
BeaconNodeAPIEndpoint string // Beacon node api endpoint
2829
}
2930

3031
// SyncingPipeline is a derivation pipeline for syncing data from L1 and DA and transform it into
@@ -55,6 +56,13 @@ func NewSyncingPipeline(ctx context.Context, blockchain *core.BlockChain, genesi
5556
}
5657

5758
blobClientList := blob_client.NewBlobClientList()
59+
if config.BeaconNodeAPIEndpoint != "" {
60+
beaconNodeClient, err := blob_client.NewBeaconNodeClient(config.BeaconNodeAPIEndpoint, l1Client)
61+
if err != nil {
62+
log.Warn("failed to create BeaconNodeClient", "err", err)
63+
}
64+
blobClientList.AddBlobClient(beaconNodeClient)
65+
}
5866
if config.BlobScanAPIEndpoint != "" {
5967
blobClientList.AddBlobClient(blob_client.NewBlobScanClient(config.BlobScanAPIEndpoint))
6068
}

0 commit comments

Comments
 (0)