Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/livepeer/starter/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ func NewLivepeerConfig(fs *flag.FlagSet) LivepeerConfig {

// flags
cfg.TestOrchAvail = fs.Bool("startupAvailabilityCheck", *cfg.TestOrchAvail, "Set to false to disable the startup Orchestrator availability check on the configured serviceAddr")
cfg.RemoteSigner = fs.Bool("remoteSigner", *cfg.RemoteSigner, "Set to true to run remote signer service")
cfg.RemoteSignerAddr = fs.String("remoteSignerAddr", *cfg.RemoteSignerAddr, "URL of remote signer service to use (e.g., http://localhost:8935). Gateway only.")

// Gateway metrics
cfg.KafkaBootstrapServers = fs.String("kafkaBootstrapServers", *cfg.KafkaBootstrapServers, "URL of Kafka Bootstrap Servers")
Expand Down
54 changes: 53 additions & 1 deletion cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ type LivepeerConfig struct {
OrchBlacklist *string
OrchMinLivepeerVersion *string
TestOrchAvail *bool
RemoteSigner *bool
RemoteSignerAddr *string
AIRunnerImage *string
AIRunnerImageOverrides *string
AIVerboseLogs *bool
Expand Down Expand Up @@ -300,6 +302,8 @@ func DefaultLivepeerConfig() LivepeerConfig {

// Flags
defaultTestOrchAvail := true
defaultRemoteSigner := false
defaultRemoteSignerAddr := ""

// Gateway logs
defaultKafkaBootstrapServers := ""
Expand Down Expand Up @@ -419,7 +423,9 @@ func DefaultLivepeerConfig() LivepeerConfig {
OrchMinLivepeerVersion: &defaultMinLivepeerVersion,

// Flags
TestOrchAvail: &defaultTestOrchAvail,
TestOrchAvail: &defaultTestOrchAvail,
RemoteSigner: &defaultRemoteSigner,
RemoteSignerAddr: &defaultRemoteSignerAddr,

// Gateway logs
KafkaBootstrapServers: &defaultKafkaBootstrapServers,
Expand Down Expand Up @@ -676,8 +682,17 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
}
}

// Validate remote signer mode
if *cfg.RemoteSigner {
if *cfg.Network == "offchain" {
exit("Remote signer mode requires on-chain network")
}
}

if *cfg.Redeemer {
n.NodeType = core.RedeemerNode
} else if *cfg.RemoteSigner {
n.NodeType = core.RemoteSignerNode
} else if *cfg.Orchestrator {
n.NodeType = core.OrchestratorNode
if !*cfg.Transcoder {
Expand Down Expand Up @@ -1564,6 +1579,32 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
}

bcast := core.NewBroadcaster(n)

// Populate infoSig with remote signer if configured
if *cfg.RemoteSignerAddr != "" {
url, err := url.Parse(*cfg.RemoteSignerAddr)
if err != nil {
glog.Exit("Invalid remote signer addr: ", err)
}

glog.Info("Retrieving OrchestratorInfo fields from remote signer: ", url)
fields, err := server.GetOrchInfoSig(url)
if err != nil {
glog.Exit("Unable to query remote signer: ", err)
}
n.RemoteSignerAddr = url
n.RemoteEthAddr = ethcommon.BytesToAddress(fields.Address)
n.InfoSig = fields.Signature
glog.Info("Using Ethereum address from remote signer: ", n.RemoteEthAddr)
} else {
// Use local signing
infoSig, err := bcast.Sign([]byte(fmt.Sprintf("%v", bcast.Address().Hex())))
if err != nil {
glog.Exit("Unable to generate info sig: ", err)
}
n.InfoSig = infoSig
}

orchBlacklist := parseOrchBlacklist(cfg.OrchBlacklist)
if *cfg.OrchPerfStatsURL != "" && *cfg.Region != "" {
glog.Infof("Using Performance Stats, region=%s, URL=%s, minPerfScore=%v", *cfg.Region, *cfg.OrchPerfStatsURL, *cfg.MinPerfScore)
Expand Down Expand Up @@ -1790,6 +1831,17 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
}()
}

// Start remote signer server if in remote signer mode
if n.NodeType == core.RemoteSignerNode {
go func() {
glog.Info("Starting remote signer server on ", *cfg.HttpAddr)
err := server.StartRemoteSignerServer(s, *cfg.HttpAddr)
if err != nil {
exit("Error starting remote signer server: err=%q", err)
}
}()
}

go func() {
if core.OrchestratorNode != n.NodeType {
return
Expand Down
1 change: 1 addition & 0 deletions common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type NodeStatus struct {
type Broadcaster interface {
Address() ethcommon.Address
Sign([]byte) ([]byte, error)
OrchInfoSig() []byte
ExtraNodes() int
}

Expand Down
14 changes: 13 additions & 1 deletion core/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,23 @@ func (bcast *broadcaster) Sign(msg []byte) ([]byte, error) {
return bcast.node.Eth.Sign(crypto.Keccak256(msg))
}
func (bcast *broadcaster) Address() ethcommon.Address {
if bcast.node == nil || bcast.node.Eth == nil {
if bcast.node == nil {
return ethcommon.Address{}
}
if (bcast.node.RemoteEthAddr != ethcommon.Address{}) {
return bcast.node.RemoteEthAddr
}
if bcast.node.Eth == nil {
return ethcommon.Address{}
}
return bcast.node.Eth.Account().Address
}
func (bcast *broadcaster) OrchInfoSig() []byte {
if bcast == nil || bcast.node == nil {
return nil
}
return bcast.node.InfoSig
}
func (bcast *broadcaster) ExtraNodes() int {
if bcast == nil || bcast.node == nil {
return 0
Expand Down
9 changes: 9 additions & 0 deletions core/livepeernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/livepeer/go-livepeer/common"
"github.com/livepeer/go-livepeer/eth"
lpmon "github.com/livepeer/go-livepeer/monitor"

ethcommon "github.com/ethereum/go-ethereum/common"
)

var ErrTranscoderAvail = errors.New("ErrTranscoderUnavailable")
Expand All @@ -48,6 +50,7 @@ const (
TranscoderNode
RedeemerNode
AIWorkerNode
RemoteSignerNode
)

var nodeTypeStrs = map[NodeType]string{
Expand All @@ -57,6 +60,7 @@ var nodeTypeStrs = map[NodeType]string{
TranscoderNode: "transcoder",
RedeemerNode: "redeemer",
AIWorkerNode: "aiworker",
RemoteSignerNode: "remotesigner",
}

func (t NodeType) String() string {
Expand Down Expand Up @@ -144,6 +148,11 @@ type LivepeerNode struct {
Sender pm.Sender
ExtraNodes int

// Gateway fields for remote signers
RemoteSignerAddr *url.URL
RemoteEthAddr ethcommon.Address // eth address of the remote signer
InfoSig []byte // sig over eth address for the OrchestratorInfo request

// Thread safety for config fields
mu sync.RWMutex
StorageConfigs map[string]*transcodeConfig
Expand Down
1 change: 1 addition & 0 deletions discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type stubBroadcaster struct{}
func (s *stubBroadcaster) Sign(msg []byte) ([]byte, error) { return []byte{}, nil }
func (s *stubBroadcaster) Address() ethcommon.Address { return ethcommon.Address{} }
func (s *stubBroadcaster) ExtraNodes() int { return 0 }
func (s *stubBroadcaster) OrchInfoSig() []byte { return nil }

func TestNewDBOrchestratorPoolCache_NilEthClient_ReturnsError(t *testing.T) {
assert := assert.New(t)
Expand Down
141 changes: 141 additions & 0 deletions server/remote_signer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package server

import (
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"time"

"github.com/golang/glog"
"github.com/livepeer/go-livepeer/clog"
"github.com/livepeer/go-livepeer/core"
)

// SignOrchestratorInfo handles signing GetOrchestratorInfo requests for multiple orchestrators
func (ls *LivepeerServer) SignOrchestratorInfo(w http.ResponseWriter, r *http.Request) {
ctx := clog.AddVal(r.Context(), "request_id", string(core.RandomManifestID()))
remoteAddr := getRemoteAddr(r)
clog.Info(ctx, "Orch info signature request", "ip", remoteAddr)

// Get the broadcaster (signer)
// In remote signer mode, we may not have an OrchestratorPool, so create a broadcaster directly
gw := core.NewBroadcaster(ls.LivepeerNode)

// Create empty params for signing
params := GetOrchestratorInfoParams{}

// Generate the request (this creates the signature)
req, err := genOrchestratorReq(gw, params)
if err != nil {
clog.Errorf(ctx, "Failed to generate request: err=%q", err)
respondJsonError(ctx, w, err, http.StatusInternalServerError)
return
}

// Extract signature and format as hex
var (
signature = "0x" + hex.EncodeToString(req.Sig)
address = gw.Address().String()
)

results := map[string]string{
"address": address,
"signature": signature,
}

// Return JSON response
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_ = json.NewEncoder(w).Encode(results)
}

// StartRemoteSignerServer starts the HTTP server for remote signer mode
func StartRemoteSignerServer(ls *LivepeerServer, bind string) error {
// Register the remote signer endpoint
ls.HTTPMux.Handle("POST /sign-orchestrator-info", http.HandlerFunc(ls.SignOrchestratorInfo))

// Start the HTTP server
glog.Info("Starting Remote Signer server on ", bind)
gw := core.NewBroadcaster(ls.LivepeerNode)
sig, err := gw.Sign([]byte(fmt.Sprintf("%v", gw.Address().Hex())))
if err != nil {
return err
}
ls.LivepeerNode.InfoSig = sig
srv := http.Server{
Addr: bind,
Handler: ls.HTTPMux,
IdleTimeout: HTTPIdleTimeout,
}
return srv.ListenAndServe()
}

// HexBytes represents a byte slice that marshals/unmarshals as hex with 0x prefix
type HexBytes []byte

func (h HexBytes) MarshalJSON() ([]byte, error) {
hexStr := "0x" + hex.EncodeToString(h)
return json.Marshal(hexStr)
}

func (h *HexBytes) UnmarshalJSON(data []byte) error {
var hexStr string
if err := json.Unmarshal(data, &hexStr); err != nil {
return err
}

// Remove 0x prefix if present
if len(hexStr) >= 2 && hexStr[:2] == "0x" {
hexStr = hexStr[2:]
}

// Decode hex string to bytes
decoded, err := hex.DecodeString(hexStr)
if err != nil {
return fmt.Errorf("invalid hex string: %v", err)
}

*h = decoded
return nil
}

// OrchInfoSigResponse represents the response from the remote signer
type OrchInfoSigResponse struct {
Address HexBytes `json:"address"`
Signature HexBytes `json:"signature"`
}

// Calls the remote signer service to get a signature for GetOrchInfo
func GetOrchInfoSig(remoteSignerHost *url.URL) (*OrchInfoSigResponse, error) {

url := remoteSignerHost.ResolveReference(&url.URL{Path: "/sign-orchestrator-info"})

// Create HTTP client with timeout
client := &http.Client{
Timeout: 30 * time.Second,
}

// Make the request
resp, err := client.Post(url.String(), "application/json", nil)
if err != nil {
return nil, fmt.Errorf("failed to call remote signer: %w", err)
}
defer resp.Body.Close()

// Check response status
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("remote signer returned status %d: %s", resp.StatusCode, string(body))
}

// Parse response
var signerResp OrchInfoSigResponse
if err := json.NewDecoder(resp.Body).Decode(&signerResp); err != nil {
return nil, fmt.Errorf("failed to parse remote signer response: %w", err)
}

return &signerResp, nil
}
6 changes: 1 addition & 5 deletions server/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,11 +363,7 @@ func startOrchestratorClient(ctx context.Context, uri *url.URL) (net.Orchestrato
}

func genOrchestratorReq(b common.Broadcaster, params GetOrchestratorInfoParams) (*net.OrchestratorRequest, error) {
sig, err := b.Sign([]byte(fmt.Sprintf("%v", b.Address().Hex())))
if err != nil {
return nil, err
}
return &net.OrchestratorRequest{Address: b.Address().Bytes(), Sig: sig, Capabilities: params.Caps, IgnoreCapacityCheck: params.IgnoreCapacityCheck}, nil
return &net.OrchestratorRequest{Address: b.Address().Bytes(), Sig: b.OrchInfoSig(), Capabilities: params.Caps, IgnoreCapacityCheck: params.IgnoreCapacityCheck}, nil
}

func genEndSessionRequest(sess *BroadcastSession) (*net.EndTranscodingSessionRequest, error) {
Expand Down
Loading