Skip to content

Commit 378cc05

Browse files
committed
refactor: Refactor msgmux.Router to use actor pkg
1 parent 226dc4e commit 378cc05

File tree

3 files changed

+165
-138
lines changed

3 files changed

+165
-138
lines changed

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ require (
6464
pgregory.net/rapid v1.2.0
6565
)
6666

67+
require github.com/lightningnetwork/lnd/actor v0.0.2 // indirect
68+
6769
require (
6870
dario.cat/mergo v1.0.1 // indirect
6971
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,8 @@ github.com/lightninglabs/neutrino/cache v1.1.2 h1:C9DY/DAPaPxbFC+xNNEI/z1SJY9GS3
370370
github.com/lightninglabs/neutrino/cache v1.1.2/go.mod h1:XJNcgdOw1LQnanGjw8Vj44CvguYA25IMKjWFZczwZuo=
371371
github.com/lightninglabs/protobuf-go-hex-display v1.33.0-hex-display h1:Y2WiPkBS/00EiEg0qp0FhehxnQfk3vv8U6Xt3nN+rTY=
372372
github.com/lightninglabs/protobuf-go-hex-display v1.33.0-hex-display/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
373+
github.com/lightningnetwork/lnd/actor v0.0.2 h1:+1Tz715d1iC2WB3Y+qgP4ycRC2HTj/JPOBqK7753dNc=
374+
github.com/lightningnetwork/lnd/actor v0.0.2/go.mod h1:RKgQPYHLVHuIRdMF/Q8+NPV2Nva3F66ZsULD2+TskKc=
373375
github.com/lightningnetwork/lnd/cert v1.2.2 h1:71YK6hogeJtxSxw2teq3eGeuy4rHGKcFf0d0Uy4qBjI=
374376
github.com/lightningnetwork/lnd/cert v1.2.2/go.mod h1:jQmFn/Ez4zhDgq2hnYSw8r35bqGVxViXhX6Cd7HXM6U=
375377
github.com/lightningnetwork/lnd/clock v1.1.1 h1:OfR3/zcJd2RhH0RU+zX/77c0ZiOnIMsDIBjgjWdZgA0=

msgmux/msg_router.go

Lines changed: 161 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ import (
44
"context"
55
"fmt"
66
"maps"
7-
"sync"
87

98
"github.com/btcsuite/btcd/btcec/v2"
9+
"github.com/lightningnetwork/lnd/actor"
1010
"github.com/lightningnetwork/lnd/fn/v2"
1111
"github.com/lightningnetwork/lnd/lnwire"
1212
)
@@ -73,196 +73,219 @@ type Router interface {
7373
Stop()
7474
}
7575

76-
// sendQuery sends a query to the main event loop, and returns the response.
77-
func sendQuery[Q any, R any](sendChan chan fn.Req[Q, R], queryArg Q,
78-
quit chan struct{}) fn.Result[R] {
76+
// EndpointsMap is a map of all registered endpoints.
77+
type EndpointsMap map[EndpointName]Endpoint
7978

80-
query, respChan := fn.NewReq[Q, R](queryArg)
79+
// routerActorMsg is a union interface for all router actor messages.
80+
type routerActorMsg interface {
81+
actor.Message
82+
isRouterActorMsg()
83+
}
8184

82-
if !fn.SendOrQuit(sendChan, query, quit) {
83-
return fn.Errf[R]("router shutting down")
84-
}
85+
// registerEndpointMsg is a message to register an endpoint.
86+
type registerEndpointMsg struct {
87+
actor.BaseMessage
88+
endpoint Endpoint
89+
}
8590

86-
return fn.NewResult(fn.RecvResp(respChan, nil, quit))
91+
// MessageType returns the type name of the message.
92+
func (m *registerEndpointMsg) MessageType() string { return "registerEndpointMsg" }
93+
func (m *registerEndpointMsg) isRouterActorMsg() {}
94+
95+
// unregisterEndpointMsg is a message to unregister an endpoint.
96+
type unregisterEndpointMsg struct {
97+
actor.BaseMessage
98+
name EndpointName
8799
}
88100

89-
// sendQueryErr is a helper function based on sendQuery that can be used when
90-
// the query only needs an error response.
91-
func sendQueryErr[Q any](sendChan chan fn.Req[Q, error], queryArg Q,
92-
quitChan chan struct{}) error {
101+
// MessageType returns the type name of the message.
102+
func (m *unregisterEndpointMsg) MessageType() string { return "unregisterEndpointMsg" }
103+
func (m *unregisterEndpointMsg) isRouterActorMsg() {}
93104

94-
return fn.ElimEither(
95-
sendQuery(sendChan, queryArg, quitChan).Either,
96-
fn.Iden, fn.Iden,
97-
)
105+
// routeMsg is a message to route a peer message.
106+
type routeMsg struct {
107+
actor.BaseMessage
108+
peerMsg PeerMsg
98109
}
99110

100-
// EndpointsMap is a map of all registered endpoints.
101-
type EndpointsMap map[EndpointName]Endpoint
111+
// MessageType returns the type name of the message.
112+
func (m *routeMsg) MessageType() string { return "routeMsg" }
113+
func (m *routeMsg) isRouterActorMsg() {}
102114

103-
// MultiMsgRouter is a type of message router that is capable of routing new
104-
// incoming messages, permitting a message to be routed to multiple registered
105-
// endpoints.
106-
type MultiMsgRouter struct {
107-
startOnce sync.Once
108-
stopOnce sync.Once
115+
// getEndpointsMsg is a message to get all endpoints.
116+
type getEndpointsMsg struct {
117+
actor.BaseMessage
118+
}
119+
120+
// MessageType returns the type name of the message.
121+
func (m *getEndpointsMsg) MessageType() string { return "getEndpointsMsg" }
122+
func (m *getEndpointsMsg) isRouterActorMsg() {}
123+
124+
// routerBehavior is the actor behavior for the message router.
125+
type routerBehavior struct {
126+
endpoints map[EndpointName]Endpoint
127+
}
128+
129+
// Receive handles incoming messages for the router actor.
130+
func (b *routerBehavior) Receive(ctx context.Context,
131+
msg routerActorMsg) fn.Result[any] {
132+
133+
switch m := msg.(type) {
134+
case *registerEndpointMsg:
135+
log.Infof("MsgRouter: registering new Endpoint(%s)",
136+
m.endpoint.Name())
109137

110-
// registerChan is the channel that all new endpoints will be sent to.
111-
registerChan chan fn.Req[Endpoint, error]
138+
if _, ok := b.endpoints[m.endpoint.Name()]; ok {
139+
log.Errorf("MsgRouter: rejecting duplicate endpoint: %v",
140+
m.endpoint.Name())
141+
return fn.Ok[any](ErrDuplicateEndpoint)
142+
}
143+
b.endpoints[m.endpoint.Name()] = m.endpoint
144+
return fn.Ok[any](nil) // nil error
145+
146+
case *unregisterEndpointMsg:
147+
log.Infof("MsgRouter: unregistering Endpoint(%s)", m.name)
148+
delete(b.endpoints, m.name)
149+
return fn.Ok[any](nil) // nil error
150+
151+
case *routeMsg:
152+
var couldSend bool
153+
for _, endpoint := range b.endpoints {
154+
if endpoint.CanHandle(m.peerMsg) {
155+
log.Tracef("MsgRouter: sending msg %T to endpoint %s",
156+
m.peerMsg, endpoint.Name())
157+
158+
sent := endpoint.SendMessage(ctx, m.peerMsg)
159+
couldSend = couldSend || sent
160+
}
161+
}
112162

113-
// unregisterChan is the channel that all endpoints that are to be
114-
// removed are sent to.
115-
unregisterChan chan fn.Req[EndpointName, error]
163+
var err error
164+
if !couldSend {
165+
log.Tracef("MsgRouter: unable to route msg %T",
166+
m.peerMsg.Message)
167+
err = ErrUnableToRouteMsg
168+
}
169+
return fn.Ok[any](err)
116170

117-
// msgChan is the channel that all messages will be sent to for
118-
// processing.
119-
msgChan chan fn.Req[PeerMsg, error]
171+
case *getEndpointsMsg:
172+
endpointsCopy := make(EndpointsMap, len(b.endpoints))
173+
maps.Copy(b.endpoints, endpointsCopy)
174+
return fn.Ok[any](endpointsCopy)
120175

121-
// endpointsQueries is a channel that all queries to the endpoints map
122-
// will be sent to.
123-
endpointQueries chan fn.Req[Endpoint, EndpointsMap]
176+
default:
177+
return fn.Err[any](fmt.Errorf("unknown message type: %T", m))
178+
}
179+
}
124180

125-
wg sync.WaitGroup
126-
quit chan struct{}
181+
// MultiMsgRouter is a type of message router that is capable of routing new
182+
// incoming messages, permitting a message to be routed to multiple registered
183+
// endpoints.
184+
type MultiMsgRouter struct {
185+
actor *actor.Actor[routerActorMsg, any]
127186
}
128187

129188
// NewMultiMsgRouter creates a new instance of a peer message router.
130189
func NewMultiMsgRouter() *MultiMsgRouter {
190+
beh := &routerBehavior{
191+
endpoints: make(map[EndpointName]Endpoint),
192+
}
193+
actorCfg := actor.ActorConfig[routerActorMsg, any]{
194+
ID: "msg-router",
195+
Behavior: beh,
196+
MailboxSize: 100,
197+
}
198+
routerActor := actor.NewActor(actorCfg)
199+
131200
return &MultiMsgRouter{
132-
registerChan: make(chan fn.Req[Endpoint, error]),
133-
unregisterChan: make(chan fn.Req[EndpointName, error]),
134-
msgChan: make(chan fn.Req[PeerMsg, error]),
135-
endpointQueries: make(chan fn.Req[Endpoint, EndpointsMap]),
136-
quit: make(chan struct{}),
201+
actor: routerActor,
137202
}
138203
}
139204

140205
// Start starts the peer message router.
141206
func (p *MultiMsgRouter) Start(ctx context.Context) {
142207
log.Infof("Starting Router")
143208

144-
p.startOnce.Do(func() {
145-
p.wg.Add(1)
146-
go p.msgRouter(ctx)
147-
})
209+
p.actor.Start()
148210
}
149211

150212
// Stop stops the peer message router.
151213
func (p *MultiMsgRouter) Stop() {
152214
log.Infof("Stopping Router")
153215

154-
p.stopOnce.Do(func() {
155-
close(p.quit)
156-
p.wg.Wait()
157-
})
216+
p.actor.Stop()
158217
}
159218

160219
// RegisterEndpoint registers a new endpoint with the router. If a duplicate
161220
// endpoint exists, an error is returned.
162221
func (p *MultiMsgRouter) RegisterEndpoint(endpoint Endpoint) error {
163-
return sendQueryErr(p.registerChan, endpoint, p.quit)
164-
}
222+
msg := &registerEndpointMsg{endpoint: endpoint}
223+
future := p.actor.Ref().Ask(context.Background(), msg)
224+
res := future.Await(context.Background())
165225

166-
// UnregisterEndpoint unregisters the target endpoint from the router.
167-
func (p *MultiMsgRouter) UnregisterEndpoint(name EndpointName) error {
168-
return sendQueryErr(p.unregisterChan, name, p.quit)
169-
}
226+
val, err := res.Unpack()
227+
if err != nil {
228+
return err
229+
}
170230

171-
// RouteMsg attempts to route the target message to a registered endpoint. If
172-
// ANY endpoint could handle the message, then nil is returned.
173-
func (p *MultiMsgRouter) RouteMsg(msg PeerMsg) error {
174-
return sendQueryErr(p.msgChan, msg, p.quit)
175-
}
231+
if typedErr, ok := val.(error); ok {
232+
return typedErr
233+
}
176234

177-
// Endpoints returns a list of all registered endpoints.
178-
func (p *MultiMsgRouter) endpoints() fn.Result[EndpointsMap] {
179-
return sendQuery(p.endpointQueries, nil, p.quit)
235+
return nil
180236
}
181237

182-
// msgRouter is the main goroutine that handles all incoming messages.
183-
func (p *MultiMsgRouter) msgRouter(ctx context.Context) {
184-
defer p.wg.Done()
185-
186-
// endpoints is a map of all registered endpoints.
187-
endpoints := make(map[EndpointName]Endpoint)
188-
189-
for {
190-
select {
191-
// A new endpoint was just sent in, so we'll add it to our set
192-
// of registered endpoints.
193-
case newEndpointMsg := <-p.registerChan:
194-
endpoint := newEndpointMsg.Request
195-
196-
log.Infof("MsgRouter: registering new "+
197-
"Endpoint(%s)", endpoint.Name())
198-
199-
// If this endpoint already exists, then we'll return
200-
// an error as we require unique names.
201-
if _, ok := endpoints[endpoint.Name()]; ok {
202-
log.Errorf("MsgRouter: rejecting "+
203-
"duplicate endpoint: %v",
204-
endpoint.Name())
205-
206-
newEndpointMsg.Resolve(ErrDuplicateEndpoint)
207-
208-
continue
209-
}
210-
211-
endpoints[endpoint.Name()] = endpoint
212-
213-
newEndpointMsg.Resolve(nil)
214-
215-
// A request to unregister an endpoint was just sent in, so
216-
// we'll attempt to remove it.
217-
case endpointName := <-p.unregisterChan:
218-
delete(endpoints, endpointName.Request)
219-
220-
log.Infof("MsgRouter: unregistering "+
221-
"Endpoint(%s)", endpointName.Request)
238+
// UnregisterEndpoint unregisters the target endpoint from the router.
239+
func (p *MultiMsgRouter) UnregisterEndpoint(name EndpointName) error {
240+
msg := &unregisterEndpointMsg{name: name}
241+
future := p.actor.Ref().Ask(context.Background(), msg)
242+
res := future.Await(context.Background())
222243

223-
endpointName.Resolve(nil)
244+
val, err := res.Unpack()
245+
if err != nil {
246+
return err
247+
}
224248

225-
// A new message was just sent in. We'll attempt to route it to
226-
// all the endpoints that can handle it.
227-
case msgQuery := <-p.msgChan:
228-
msg := msgQuery.Request
249+
if typedErr, ok := val.(error); ok {
250+
return typedErr
251+
}
229252

230-
// Loop through all the endpoints and send the message
231-
// to those that can handle it the message.
232-
var couldSend bool
233-
for _, endpoint := range endpoints {
234-
if endpoint.CanHandle(msg) {
235-
log.Tracef("MsgRouter: sending "+
236-
"msg %T to endpoint %s", msg,
237-
endpoint.Name())
253+
return nil
254+
}
238255

239-
sent := endpoint.SendMessage(ctx, msg)
240-
couldSend = couldSend || sent
241-
}
242-
}
256+
// RouteMsg attempts to route the target message to a registered endpoint. If
257+
// ANY endpoint could handle the message, then nil is returned.
258+
func (p *MultiMsgRouter) RouteMsg(msg PeerMsg) error {
259+
future := p.actor.Ref().Ask(context.Background(), &routeMsg{peerMsg: msg})
260+
res := future.Await(context.Background())
243261

244-
var err error
245-
if !couldSend {
246-
log.Tracef("MsgRouter: unable to route "+
247-
"msg %T", msg.Message)
262+
val, err := res.Unpack()
263+
if err != nil {
264+
return err
265+
}
248266

249-
err = ErrUnableToRouteMsg
250-
}
267+
if typedErr, ok := val.(error); ok {
268+
return typedErr
269+
}
251270

252-
msgQuery.Resolve(err)
271+
return nil
272+
}
253273

254-
// A query for the endpoint state just came in, we'll send back
255-
// a copy of our current state.
256-
case endpointQuery := <-p.endpointQueries:
257-
endpointsCopy := make(EndpointsMap, len(endpoints))
258-
maps.Copy(endpointsCopy, endpoints)
274+
// Endpoints returns a list of all registered endpoints.
275+
func (p *MultiMsgRouter) endpoints() fn.Result[EndpointsMap] {
276+
future := p.actor.Ref().Ask(context.Background(), &getEndpointsMsg{})
277+
res := future.Await(context.Background())
259278

260-
endpointQuery.Resolve(endpointsCopy)
279+
val, err := res.Unpack()
280+
if err != nil {
281+
return fn.Err[EndpointsMap](err)
282+
}
261283

262-
case <-p.quit:
263-
return
264-
}
284+
if endpoints, ok := val.(EndpointsMap); ok {
285+
return fn.Ok(endpoints)
265286
}
287+
288+
return fn.Err[EndpointsMap](fmt.Errorf("unexpected response type: %T", val))
266289
}
267290

268291
// A compile time check to ensure MultiMsgRouter implements the MsgRouter

0 commit comments

Comments
 (0)