Skip to content

Commit d0892fa

Browse files
committed
poc e2e grpc communication
1 parent f1448e8 commit d0892fa

File tree

9 files changed

+228
-9
lines changed

9 files changed

+228
-9
lines changed

cmd/geth/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,11 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
172172
utils.RegisterGraphQLService(stack, backend, filterSystem, &cfg.Node)
173173
}
174174

175+
// Configure gRPC if requested.
176+
if ctx.IsSet(utils.GRPCEnabledFlag.Name) {
177+
utils.RegisterGRPCService(stack, backend, &cfg.Node)
178+
}
179+
175180
// Add the Ethereum Stats daemon if requested.
176181
if cfg.Ethstats.URL != "" {
177182
utils.RegisterEthStatsService(stack, backend, cfg.Ethstats.URL)

cmd/geth/main.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,9 @@ var (
180180
utils.RPCGlobalEVMTimeoutFlag,
181181
utils.RPCGlobalTxFeeCapFlag,
182182
utils.AllowUnprotectedTxs,
183+
utils.GRPCEnabledFlag,
184+
utils.GRPCHostFlag,
185+
utils.GRPCPortFlag,
183186
}
184187

185188
metricsFlags = []cli.Flag{

cmd/utils/flags.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -791,6 +791,24 @@ var (
791791
Usage: "Enables the (deprecated) personal namespace",
792792
Category: flags.APICategory,
793793
}
794+
// grpc
795+
GRPCEnabledFlag = &cli.BoolFlag{
796+
Name: "grpc",
797+
Usage: "Enable the gRPC server",
798+
Category: flags.APICategory,
799+
}
800+
GRPCHostFlag = &cli.StringFlag{
801+
Name: "grpc.addr",
802+
Usage: "gRPC server listening interface",
803+
Value: node.DefaultGRPCHost,
804+
Category: flags.APICategory,
805+
}
806+
GRPCPortFlag = &cli.IntFlag{
807+
Name: "grpc.port",
808+
Usage: "gRPC server listening port",
809+
Value: node.DefaultGRPCPort,
810+
Category: flags.APICategory,
811+
}
794812

795813
// Network Settings
796814
MaxPeersFlag = &cli.IntFlag{
@@ -1211,6 +1229,19 @@ func setHTTP(ctx *cli.Context, cfg *node.Config) {
12111229
}
12121230
}
12131231

1232+
// setGRCP creates the gRPC RPC listener interface string from the set command
1233+
// line flags, returning empty if the gRPC endpoint is disabled.
1234+
func setGRCP(ctx *cli.Context, cfg *node.Config) {
1235+
if ctx.Bool(GRPCEnabledFlag.Name) && cfg.GRPCHost == "" {
1236+
if ctx.IsSet(GRPCHostFlag.Name) {
1237+
cfg.GRPCHost = ctx.String(GRPCHostFlag.Name)
1238+
}
1239+
if ctx.IsSet(GRPCPortFlag.Name) {
1240+
cfg.GRPCPort = ctx.Int(GRPCPortFlag.Name)
1241+
}
1242+
}
1243+
}
1244+
12141245
// setGraphQL creates the GraphQL listener interface string from the set
12151246
// command line flags, returning empty if the GraphQL endpoint is disabled.
12161247
func setGraphQL(ctx *cli.Context, cfg *node.Config) {
@@ -1460,6 +1491,7 @@ func SetNodeConfig(ctx *cli.Context, cfg *node.Config) {
14601491
SetP2PConfig(ctx, &cfg.P2P)
14611492
setIPC(ctx, cfg)
14621493
setHTTP(ctx, cfg)
1494+
setGRCP(ctx, cfg)
14631495
setGraphQL(ctx, cfg)
14641496
setWS(ctx, cfg)
14651497
setNodeUserIdent(ctx, cfg)
@@ -2032,6 +2064,14 @@ func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, filterSyst
20322064
}
20332065
}
20342066

2067+
// RegisterGRPCService adds the gRPC API to the node.
2068+
// It was done this way so that our grpc execution server can access the ethapi.Backend
2069+
func RegisterGRPCService(stack *node.Node, backend ethapi.Backend, cfg *node.Config) {
2070+
if err := node.NewGRPCServerHandler(stack, backend, cfg); err != nil {
2071+
Fatalf("Failed to register the gRPC service: %v", err)
2072+
}
2073+
}
2074+
20352075
// RegisterFilterAPI adds the eth log filtering RPC API to the node.
20362076
func RegisterFilterAPI(stack *node.Node, backend ethapi.Backend, ethcfg *ethconfig.Config) *filters.FilterSystem {
20372077
isLightClient := ethcfg.SyncMode == downloader.LightSync

grpc/README.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
This package provides a gRPC server as an entrypoint to the EVM.
2+
3+
Helpful commands (MacOS):
4+
```bash
5+
# install necessary dependencies
6+
brew install leveldb
7+
8+
# build geth
9+
make geth
10+
11+
# TODO - run beacon?
12+
13+
# run geth
14+
./build/bin/geth --http.api eth,web3,net,txpool --ws --ws.api eth,web3,net,txpool --grpc --grpc.addr="https://[::1]" --grpc.port 50051
15+
```

grpc/execution/server.go

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,13 @@ package execution
77
import (
88
"context"
99

10+
"github.com/ethereum/go-ethereum/core/types"
1011
executionv1 "github.com/ethereum/go-ethereum/grpc/gen/proto/execution/v1"
11-
"google.golang.org/grpc"
12+
"github.com/ethereum/go-ethereum/internal/ethapi"
1213
)
1314

1415
// executionServiceServer is the implementation of the ExecutionServiceServer interface.
15-
type executionServiceServer struct {
16+
type ExecutionServiceServer struct {
1617
// NOTE - from the generated code:
1718
// All implementations must embed UnimplementedExecutionServiceServer
1819
// for forward compatibility
@@ -21,23 +22,42 @@ type executionServiceServer struct {
2122
// TODO - will need access to the consensus api to call functions for building a block
2223
// e.g. getPayload, newPayload, forkchoiceUpdated
2324

25+
Backend ethapi.Backend
26+
2427
// TODO - will need access to forkchoice on first run.
2528
// this will probably be passed in when calling NewServer
2629
}
2730

2831
// FIXME - how do we know which hash to start with? will probably need another api function like
2932
// GetHeadHash() to get the head hash of the forkchoice
3033

31-
func (s *executionServiceServer) DoBlock(ctx context.Context, req *executionv1.DoBlockRequest) (*executionv1.DoBlockResponse, error) {
32-
// Request.Header.ParentHash needs to match forkchoice head hash
34+
func (s *ExecutionServiceServer) DoBlock(ctx context.Context, req *executionv1.DoBlockRequest) (*executionv1.DoBlockResponse, error) {
35+
print("DoBlock called")
36+
37+
// NOTE - Request.Header.ParentHash needs to match forkchoice head hash
3338
// ParentHash should be the forkchoice head of the last block
3439

3540
// TODO - need to call consensus api to build a block
36-
return nil, nil
41+
42+
txs := bytesToTransactions(req.Transactions)
43+
for _, tx := range txs {
44+
s.Backend.SendTx(ctx, tx)
45+
}
46+
47+
res := &executionv1.DoBlockResponse{
48+
// TODO - get state root from last block
49+
StateRoot: []byte{0x00},
50+
}
51+
return res, nil
3752
}
3853

39-
func NewServer() *grpc.Server {
40-
server := grpc.NewServer()
41-
executionv1.RegisterExecutionServiceServer(server, &executionServiceServer{})
42-
return server
54+
// convert bytes to transactions
55+
func bytesToTransactions(b [][]byte) []*types.Transaction {
56+
txs := []*types.Transaction{}
57+
for _, txBytes := range b {
58+
tx := &types.Transaction{}
59+
tx.UnmarshalBinary(txBytes)
60+
txs = append(txs, tx)
61+
}
62+
return txs
4363
}

node/config.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,12 @@ type Config struct {
189189
// Requests using ip address directly are not affected
190190
GraphQLVirtualHosts []string `toml:",omitempty"`
191191

192+
// GRPCHost is the host interface on which to start the gRPC server. If this
193+
// field is empty, no gRPC API endpoint will be started.
194+
GRPCHost string `toml:",omitempty"`
195+
// GRPCPort is the TCP port number on which to start the gRPC server.
196+
GRPCPort int `toml:",omitempty"`
197+
192198
// Logger is a custom logger to use with the p2p.Server.
193199
Logger log.Logger `toml:",omitempty"`
194200

@@ -260,12 +266,29 @@ func (c *Config) HTTPEndpoint() string {
260266
return fmt.Sprintf("%s:%d", c.HTTPHost, c.HTTPPort)
261267
}
262268

269+
// GRPCEndpoint resolves a gRPC endpoint based on the configured host interface
270+
// and port parameters.
271+
func (c *Config) GRPCEndpoint() string {
272+
if c.GRPCHost == "" {
273+
return ""
274+
}
275+
return fmt.Sprintf("%s:%d", c.GRPCHost, c.GRPCPort)
276+
}
277+
263278
// DefaultHTTPEndpoint returns the HTTP endpoint used by default.
264279
func DefaultHTTPEndpoint() string {
265280
config := &Config{HTTPHost: DefaultHTTPHost, HTTPPort: DefaultHTTPPort, AuthPort: DefaultAuthPort}
266281
return config.HTTPEndpoint()
267282
}
268283

284+
// DefaultGRPCEndpoint returns the gRPC endpoint used by default.
285+
// NOTE - implemented this to be consistent with DefaultHTTPEndpoint, but
286+
// neither are ever used
287+
func DefaultGRPCEndpoint() string {
288+
config := &Config{GRPCHost: DefaultGRPCHost, GRPCPort: DefaultGRPCPort}
289+
return config.GRPCEndpoint()
290+
}
291+
269292
// WSEndpoint resolves a websocket endpoint based on the configured host interface
270293
// and port parameters.
271294
func (c *Config) WSEndpoint() string {

node/defaults.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ const (
3636
DefaultGraphQLPort = 8547 // Default TCP port for the GraphQL server
3737
DefaultAuthHost = "localhost" // Default host interface for the authenticated apis
3838
DefaultAuthPort = 8551 // Default port for the authenticated apis
39+
// grpc
40+
DefaultGRPCHost = "[::1]" // Default host interface for the gRPC server
41+
DefaultGRPCPort = 50051 // Default port for the gRPC server
3942
)
4043

4144
var (
@@ -65,6 +68,9 @@ var DefaultConfig = Config{
6568
NAT: nat.Any(),
6669
},
6770
DBEngine: "",
71+
// grpc
72+
GRPCHost: DefaultGRPCHost,
73+
GRPCPort: DefaultGRPCPort,
6874
}
6975

7076
// DefaultDataDir is the default data directory to use for the databases and other

node/grpcstack.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package node
2+
3+
import (
4+
"net"
5+
6+
"github.com/ethereum/go-ethereum/grpc/execution"
7+
executionv1 "github.com/ethereum/go-ethereum/grpc/gen/proto/execution/v1"
8+
"github.com/ethereum/go-ethereum/internal/ethapi"
9+
"github.com/ethereum/go-ethereum/log"
10+
"google.golang.org/grpc"
11+
)
12+
13+
// GRPCServerHandler is the gRPC server handler.
14+
// It gives us a way to attach the gRPC server to the node so it can be stopped on shutdown.
15+
type GRPCServerHandler struct {
16+
endpoint string
17+
server *grpc.Server
18+
executionServiceServer *execution.ExecutionServiceServer
19+
}
20+
21+
// NewServer creates a new gRPC server.
22+
// It registers the execution service server.
23+
// It registers the gRPC server with the node so it can be stopped on shutdown.
24+
func NewGRPCServerHandler(node *Node, backend ethapi.Backend, cfg *Config) error {
25+
server := grpc.NewServer()
26+
27+
executionServiceServer := &execution.ExecutionServiceServer{
28+
Backend: backend,
29+
}
30+
31+
log.Info("gRPC server enabled", "endpoint", cfg.GRPCEndpoint())
32+
33+
serverHandler := &GRPCServerHandler{
34+
endpoint: cfg.GRPCEndpoint(),
35+
server: server,
36+
executionServiceServer: executionServiceServer,
37+
}
38+
39+
executionv1.RegisterExecutionServiceServer(server, executionServiceServer)
40+
41+
node.RegisterGRPCServer(serverHandler)
42+
return nil
43+
}
44+
45+
// Start starts the gRPC server if it is enabled.
46+
func (handler *GRPCServerHandler) Start() error {
47+
if handler.endpoint == "" {
48+
return nil
49+
}
50+
51+
// Start the gRPC server
52+
lis, err := net.Listen("tcp", handler.endpoint)
53+
if err != nil {
54+
return err
55+
}
56+
go handler.server.Serve(lis)
57+
log.Info("gRPC server started", "endpoint", handler.endpoint)
58+
return nil
59+
}
60+
61+
// Stop stops the gRPC server.
62+
func (handler *GRPCServerHandler) Stop() error {
63+
handler.server.Stop()
64+
log.Info("gRPC server stopped", "endpoint", handler.endpoint)
65+
return nil
66+
}

node/node.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ type Node struct {
6464
ipc *ipcServer // Stores information about the ipc http server
6565
inprocHandler *rpc.Server // In-process RPC request handler to process the API requests
6666

67+
// grpc
68+
grpcServerHandler *GRPCServerHandler // Stores information about the grpc server
69+
6770
databases map[*closeTrackingDB]struct{} // All open databases
6871
}
6972

@@ -274,6 +277,11 @@ func (n *Node) openEndpoints() error {
274277
n.stopRPC()
275278
n.server.Stop()
276279
}
280+
// start GRPC endpoints
281+
err = n.startGRPC()
282+
if err != nil {
283+
n.stopGRPC()
284+
}
277285
return err
278286
}
279287

@@ -521,6 +529,21 @@ func (n *Node) stopRPC() {
521529
n.stopInProc()
522530
}
523531

532+
func (n *Node) startGRPC() error {
533+
if n.config.GRPCHost != "" {
534+
// start the server
535+
if err := n.grpcServerHandler.Start(); err != nil {
536+
return err
537+
}
538+
}
539+
540+
return nil
541+
}
542+
543+
func (n *Node) stopGRPC() {
544+
n.grpcServerHandler.Stop()
545+
}
546+
524547
// startInProc registers all RPC APIs on the inproc server.
525548
func (n *Node) startInProc(apis []rpc.API) error {
526549
for _, api := range apis {
@@ -588,6 +611,19 @@ func (n *Node) getAPIs() (unauthenticated, all []rpc.API) {
588611
return unauthenticated, n.rpcAPIs
589612
}
590613

614+
// RegisterGRPCServer registers a gRPC server on the node.
615+
// This allows us to control grpc server startup and shutdown from the node.
616+
func (n *Node) RegisterGRPCServer(handler *GRPCServerHandler) {
617+
n.lock.Lock()
618+
defer n.lock.Unlock()
619+
620+
if n.state != initializingState {
621+
panic("can't register gRPC server on running/stopped node")
622+
}
623+
624+
n.grpcServerHandler = handler
625+
}
626+
591627
// RegisterHandler mounts a handler on the given path on the canonical HTTP server.
592628
//
593629
// The name of the handler is shown in a log message when the HTTP server starts
@@ -667,6 +703,11 @@ func (n *Node) HTTPEndpoint() string {
667703
return "http://" + n.http.listenAddr()
668704
}
669705

706+
// GRPCENDPOINT returns the URL of the GRPC server.
707+
func (n *Node) GRPCEndpoint() string {
708+
return "http://" + n.grpcServerHandler.endpoint
709+
}
710+
670711
// WSEndpoint returns the current JSON-RPC over WebSocket endpoint.
671712
func (n *Node) WSEndpoint() string {
672713
if n.http.wsAllowed() {

0 commit comments

Comments
 (0)