diff --git a/cmd/livepeer/starter/flags.go b/cmd/livepeer/starter/flags.go index 37df7ce5d1..3ad62ccff3 100644 --- a/cmd/livepeer/starter/flags.go +++ b/cmd/livepeer/starter/flags.go @@ -138,6 +138,8 @@ func NewLivepeerConfig(fs *flag.FlagSet) LivepeerConfig { // flags cfg.TestOrchAvail = fs.Bool("startupAvailabilityCheck", *cfg.TestOrchAvail, "Set to false to disable the startup Orchestrator availability check on the configured serviceAddr") + cfg.RemoteSigner = fs.Bool("remoteSigner", *cfg.RemoteSigner, "Set to true to run remote signer service") + cfg.RemoteSignerAddr = fs.String("remoteSignerAddr", *cfg.RemoteSignerAddr, "URL of remote signer service to use (e.g., http://localhost:8935). Gateway only.") // Gateway metrics cfg.KafkaBootstrapServers = fs.String("kafkaBootstrapServers", *cfg.KafkaBootstrapServers, "URL of Kafka Bootstrap Servers") diff --git a/cmd/livepeer/starter/starter.go b/cmd/livepeer/starter/starter.go index 8216753c45..981d52749f 100755 --- a/cmd/livepeer/starter/starter.go +++ b/cmd/livepeer/starter/starter.go @@ -167,6 +167,8 @@ type LivepeerConfig struct { OrchBlacklist *string OrchMinLivepeerVersion *string TestOrchAvail *bool + RemoteSigner *bool + RemoteSignerAddr *string AIRunnerImage *string AIRunnerImageOverrides *string AIVerboseLogs *bool @@ -302,6 +304,8 @@ func DefaultLivepeerConfig() LivepeerConfig { // Flags defaultTestOrchAvail := true + defaultRemoteSigner := false + defaultRemoteSignerAddr := "" // Gateway logs defaultKafkaBootstrapServers := "" @@ -422,7 +426,9 @@ func DefaultLivepeerConfig() LivepeerConfig { OrchMinLivepeerVersion: &defaultMinLivepeerVersion, // Flags - TestOrchAvail: &defaultTestOrchAvail, + TestOrchAvail: &defaultTestOrchAvail, + RemoteSigner: &defaultRemoteSigner, + RemoteSignerAddr: &defaultRemoteSignerAddr, // Gateway logs KafkaBootstrapServers: &defaultKafkaBootstrapServers, @@ -679,8 +685,17 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { } } + // Validate remote signer mode + if *cfg.RemoteSigner { + if *cfg.Network == "offchain" { + exit("Remote signer mode requires on-chain network") + } + } + if *cfg.Redeemer { n.NodeType = core.RedeemerNode + } else if *cfg.RemoteSigner { + n.NodeType = core.RemoteSignerNode } else if *cfg.Orchestrator { n.NodeType = core.OrchestratorNode if !*cfg.Transcoder { @@ -1567,6 +1582,32 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { } bcast := core.NewBroadcaster(n) + + // Populate infoSig with remote signer if configured + if *cfg.RemoteSignerAddr != "" { + url, err := url.Parse(*cfg.RemoteSignerAddr) + if err != nil { + glog.Exit("Invalid remote signer addr: ", err) + } + + glog.Info("Retrieving OrchestratorInfo fields from remote signer: ", url) + fields, err := server.GetOrchInfoSig(url) + if err != nil { + glog.Exit("Unable to query remote signer: ", err) + } + n.RemoteSignerAddr = url + n.RemoteEthAddr = ethcommon.BytesToAddress(fields.Address) + n.InfoSig = fields.Signature + glog.Info("Using Ethereum address from remote signer: ", n.RemoteEthAddr) + } else { + // Use local signing + infoSig, err := bcast.Sign([]byte(fmt.Sprintf("%v", bcast.Address().Hex()))) + if err != nil { + glog.Exit("Unable to generate info sig: ", err) + } + n.InfoSig = infoSig + } + orchBlacklist := parseOrchBlacklist(cfg.OrchBlacklist) if *cfg.OrchPerfStatsURL != "" && *cfg.Region != "" { glog.Infof("Using Performance Stats, region=%s, URL=%s, minPerfScore=%v", *cfg.Region, *cfg.OrchPerfStatsURL, *cfg.MinPerfScore) @@ -1793,6 +1834,17 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { }() } + // Start remote signer server if in remote signer mode + if n.NodeType == core.RemoteSignerNode { + go func() { + glog.Info("Starting remote signer server on ", *cfg.HttpAddr) + err := server.StartRemoteSignerServer(s, *cfg.HttpAddr) + if err != nil { + exit("Error starting remote signer server: err=%q", err) + } + }() + } + go func() { if core.OrchestratorNode != n.NodeType { return diff --git a/common/types.go b/common/types.go index d5ea3da795..d6a879cf5b 100644 --- a/common/types.go +++ b/common/types.go @@ -45,6 +45,7 @@ type NodeStatus struct { type Broadcaster interface { Address() ethcommon.Address Sign([]byte) ([]byte, error) + OrchInfoSig() []byte ExtraNodes() int } diff --git a/core/broadcaster.go b/core/broadcaster.go index d89c103505..bb9bcca5ca 100644 --- a/core/broadcaster.go +++ b/core/broadcaster.go @@ -18,11 +18,23 @@ func (bcast *broadcaster) Sign(msg []byte) ([]byte, error) { return bcast.node.Eth.Sign(crypto.Keccak256(msg)) } func (bcast *broadcaster) Address() ethcommon.Address { - if bcast.node == nil || bcast.node.Eth == nil { + if bcast.node == nil { + return ethcommon.Address{} + } + if (bcast.node.RemoteEthAddr != ethcommon.Address{}) { + return bcast.node.RemoteEthAddr + } + if bcast.node.Eth == nil { return ethcommon.Address{} } return bcast.node.Eth.Account().Address } +func (bcast *broadcaster) OrchInfoSig() []byte { + if bcast == nil || bcast.node == nil { + return nil + } + return bcast.node.InfoSig +} func (bcast *broadcaster) ExtraNodes() int { if bcast == nil || bcast.node == nil { return 0 diff --git a/core/livepeernode.go b/core/livepeernode.go index 4e61a16f15..a21ec5a938 100644 --- a/core/livepeernode.go +++ b/core/livepeernode.go @@ -26,6 +26,8 @@ import ( "github.com/livepeer/go-livepeer/common" "github.com/livepeer/go-livepeer/eth" lpmon "github.com/livepeer/go-livepeer/monitor" + + ethcommon "github.com/ethereum/go-ethereum/common" ) var ErrTranscoderAvail = errors.New("ErrTranscoderUnavailable") @@ -48,6 +50,7 @@ const ( TranscoderNode RedeemerNode AIWorkerNode + RemoteSignerNode ) var nodeTypeStrs = map[NodeType]string{ @@ -57,6 +60,7 @@ var nodeTypeStrs = map[NodeType]string{ TranscoderNode: "transcoder", RedeemerNode: "redeemer", AIWorkerNode: "aiworker", + RemoteSignerNode: "remotesigner", } func (t NodeType) String() string { @@ -144,6 +148,11 @@ type LivepeerNode struct { Sender pm.Sender ExtraNodes int + // Gateway fields for remote signers + RemoteSignerAddr *url.URL + RemoteEthAddr ethcommon.Address // eth address of the remote signer + InfoSig []byte // sig over eth address for the OrchestratorInfo request + // Thread safety for config fields mu sync.RWMutex StorageConfigs map[string]*transcodeConfig diff --git a/discovery/discovery_test.go b/discovery/discovery_test.go index 7301ae9731..7b38bd49ad 100644 --- a/discovery/discovery_test.go +++ b/discovery/discovery_test.go @@ -36,6 +36,7 @@ type stubBroadcaster struct{} func (s *stubBroadcaster) Sign(msg []byte) ([]byte, error) { return []byte{}, nil } func (s *stubBroadcaster) Address() ethcommon.Address { return ethcommon.Address{} } func (s *stubBroadcaster) ExtraNodes() int { return 0 } +func (s *stubBroadcaster) OrchInfoSig() []byte { return nil } func TestNewDBOrchestratorPoolCache_NilEthClient_ReturnsError(t *testing.T) { assert := assert.New(t) diff --git a/server/remote_signer.go b/server/remote_signer.go new file mode 100644 index 0000000000..bc4b682656 --- /dev/null +++ b/server/remote_signer.go @@ -0,0 +1,141 @@ +package server + +import ( + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "time" + + "github.com/golang/glog" + "github.com/livepeer/go-livepeer/clog" + "github.com/livepeer/go-livepeer/core" +) + +// SignOrchestratorInfo handles signing GetOrchestratorInfo requests for multiple orchestrators +func (ls *LivepeerServer) SignOrchestratorInfo(w http.ResponseWriter, r *http.Request) { + ctx := clog.AddVal(r.Context(), "request_id", string(core.RandomManifestID())) + remoteAddr := getRemoteAddr(r) + clog.Info(ctx, "Orch info signature request", "ip", remoteAddr) + + // Get the broadcaster (signer) + // In remote signer mode, we may not have an OrchestratorPool, so create a broadcaster directly + gw := core.NewBroadcaster(ls.LivepeerNode) + + // Create empty params for signing + params := GetOrchestratorInfoParams{} + + // Generate the request (this creates the signature) + req, err := genOrchestratorReq(gw, params) + if err != nil { + clog.Errorf(ctx, "Failed to generate request: err=%q", err) + respondJsonError(ctx, w, err, http.StatusInternalServerError) + return + } + + // Extract signature and format as hex + var ( + signature = "0x" + hex.EncodeToString(req.Sig) + address = gw.Address().String() + ) + + results := map[string]string{ + "address": address, + "signature": signature, + } + + // Return JSON response + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(results) +} + +// StartRemoteSignerServer starts the HTTP server for remote signer mode +func StartRemoteSignerServer(ls *LivepeerServer, bind string) error { + // Register the remote signer endpoint + ls.HTTPMux.Handle("POST /sign-orchestrator-info", http.HandlerFunc(ls.SignOrchestratorInfo)) + + // Start the HTTP server + glog.Info("Starting Remote Signer server on ", bind) + gw := core.NewBroadcaster(ls.LivepeerNode) + sig, err := gw.Sign([]byte(fmt.Sprintf("%v", gw.Address().Hex()))) + if err != nil { + return err + } + ls.LivepeerNode.InfoSig = sig + srv := http.Server{ + Addr: bind, + Handler: ls.HTTPMux, + IdleTimeout: HTTPIdleTimeout, + } + return srv.ListenAndServe() +} + +// HexBytes represents a byte slice that marshals/unmarshals as hex with 0x prefix +type HexBytes []byte + +func (h HexBytes) MarshalJSON() ([]byte, error) { + hexStr := "0x" + hex.EncodeToString(h) + return json.Marshal(hexStr) +} + +func (h *HexBytes) UnmarshalJSON(data []byte) error { + var hexStr string + if err := json.Unmarshal(data, &hexStr); err != nil { + return err + } + + // Remove 0x prefix if present + if len(hexStr) >= 2 && hexStr[:2] == "0x" { + hexStr = hexStr[2:] + } + + // Decode hex string to bytes + decoded, err := hex.DecodeString(hexStr) + if err != nil { + return fmt.Errorf("invalid hex string: %v", err) + } + + *h = decoded + return nil +} + +// OrchInfoSigResponse represents the response from the remote signer +type OrchInfoSigResponse struct { + Address HexBytes `json:"address"` + Signature HexBytes `json:"signature"` +} + +// Calls the remote signer service to get a signature for GetOrchInfo +func GetOrchInfoSig(remoteSignerHost *url.URL) (*OrchInfoSigResponse, error) { + + url := remoteSignerHost.ResolveReference(&url.URL{Path: "/sign-orchestrator-info"}) + + // Create HTTP client with timeout + client := &http.Client{ + Timeout: 30 * time.Second, + } + + // Make the request + resp, err := client.Post(url.String(), "application/json", nil) + if err != nil { + return nil, fmt.Errorf("failed to call remote signer: %w", err) + } + defer resp.Body.Close() + + // Check response status + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("remote signer returned status %d: %s", resp.StatusCode, string(body)) + } + + // Parse response + var signerResp OrchInfoSigResponse + if err := json.NewDecoder(resp.Body).Decode(&signerResp); err != nil { + return nil, fmt.Errorf("failed to parse remote signer response: %w", err) + } + + return &signerResp, nil +} diff --git a/server/rpc.go b/server/rpc.go index 5bcb1264ee..0a94539611 100644 --- a/server/rpc.go +++ b/server/rpc.go @@ -363,11 +363,7 @@ func startOrchestratorClient(ctx context.Context, uri *url.URL) (net.Orchestrato } func genOrchestratorReq(b common.Broadcaster, params GetOrchestratorInfoParams) (*net.OrchestratorRequest, error) { - sig, err := b.Sign([]byte(fmt.Sprintf("%v", b.Address().Hex()))) - if err != nil { - return nil, err - } - return &net.OrchestratorRequest{Address: b.Address().Bytes(), Sig: sig, Capabilities: params.Caps, IgnoreCapacityCheck: params.IgnoreCapacityCheck}, nil + return &net.OrchestratorRequest{Address: b.Address().Bytes(), Sig: b.OrchInfoSig(), Capabilities: params.Caps, IgnoreCapacityCheck: params.IgnoreCapacityCheck}, nil } func genEndSessionRequest(sess *BroadcastSession) (*net.EndTranscodingSessionRequest, error) { diff --git a/server/rpc_test.go b/server/rpc_test.go index 1c2fd52699..3aaae60fad 100644 --- a/server/rpc_test.go +++ b/server/rpc_test.go @@ -284,6 +284,10 @@ func (r *stubOrchestrator) GetUrlForCapability(capability string) string { func (r *stubOrchestrator) ExtraNodes() int { return 0 } +func (r *stubOrchestrator) OrchInfoSig() []byte { + b, _ := r.Sign([]byte(r.Address().Hex())) + return b +} func stubBroadcaster2() *stubOrchestrator { return newStubOrchestrator() // lazy; leverage subtyping for interface commonalities @@ -323,13 +327,6 @@ func TestRPCTranscoderReq(t *testing.T) { t.Errorf("Expected %v; got %v", o.sessCapErr, err) } o.sessCapErr = nil - - // error signing - b.signErr = fmt.Errorf("Signing error") - _, err = genOrchestratorReq(b, GetOrchestratorInfoParams{}) - if err == nil { - t.Error("Did not expect to generate a orchestrator request with invalid address") - } } func TestRPCSeg(t *testing.T) {