Skip to content

Commit 226dc4e

Browse files
committed
Merge branch 'actor-mailbox-v2' into onion-messaging-1
* actor-mailbox-v2: actor: refactor Actor to use Mailbox interface actor: add tests for mailbox implementation actor: introduce generic Mailbox interface with iter.Seq support actor: add README.md actor: add example files actor: add the actor system and router actor: add fundamental interfaces and concrete Actor impl actor: add Future[T] and Promise[T] w/ concrete impls actor: add new actor package as distinct sub-module
2 parents 64828aa + 03e7648 commit 226dc4e

18 files changed

+4658
-0
lines changed

actor/README.md

Lines changed: 475 additions & 0 deletions
Large diffs are not rendered by default.

actor/actor.go

Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
package actor
2+
3+
import (
4+
"context"
5+
"sync"
6+
7+
"github.com/lightningnetwork/lnd/fn/v2"
8+
)
9+
10+
// ActorConfig holds the configuration parameters for creating a new Actor.
11+
// It is generic over M (Message type) and R (Response type) to accommodate
12+
// the actor's specific behavior.
13+
type ActorConfig[M Message, R any] struct {
14+
// ID is the unique identifier for the actor.
15+
ID string
16+
17+
// Behavior defines how the actor responds to messages.
18+
Behavior ActorBehavior[M, R]
19+
20+
// DLO is a reference to the dead letter office for this actor system.
21+
// If nil, undeliverable messages during shutdown or due to a full
22+
// mailbox (if such logic were added) might be dropped.
23+
DLO ActorRef[Message, any]
24+
25+
// MailboxSize defines the buffer capacity of the actor's mailbox.
26+
MailboxSize int
27+
}
28+
29+
// envelope wraps a message with its associated promise. This allows the sender
30+
// of an "ask" message to await a response. If the promise is nil, it
31+
// signifies a "tell" operation (fire-and-forget).
32+
type envelope[M Message, R any] struct {
33+
message M
34+
promise Promise[R]
35+
}
36+
37+
// Actor represents a concrete actor implementation. It encapsulates a behavior,
38+
// manages its internal state implicitly through that behavior, and processes
39+
// messages from its mailbox sequentially in its own goroutine.
40+
type Actor[M Message, R any] struct {
41+
// id is the unique identifier for the actor.
42+
id string
43+
44+
// behavior defines how the actor responds to messages.
45+
behavior ActorBehavior[M, R]
46+
47+
// mailbox is the incoming message queue for the actor.
48+
mailbox Mailbox[M, R]
49+
50+
// ctx is the context governing the actor's lifecycle.
51+
ctx context.Context
52+
53+
// cancel is the function to cancel the actor's context.
54+
cancel context.CancelFunc
55+
56+
// dlo is a reference to the dead letter office for this actor system.
57+
dlo ActorRef[Message, any]
58+
59+
// startOnce ensures the actor's processing loop is started only once.
60+
startOnce sync.Once
61+
62+
// stopOnce ensures the actor's processing loop is stopped only once.
63+
stopOnce sync.Once
64+
65+
// ref is the cached ActorRef for this actor.
66+
ref ActorRef[M, R]
67+
}
68+
69+
// NewActor creates a new actor instance with the given ID and behavior.
70+
// It initializes the actor's internal structures but does not start its
71+
// message processing goroutine. The Start() method must be called to begin
72+
// processing messages.
73+
func NewActor[M Message, R any](cfg ActorConfig[M, R]) *Actor[M, R] {
74+
ctx, cancel := context.WithCancel(context.Background())
75+
76+
// Ensure MailboxSize has a sane default if not specified or zero. A
77+
// capacity of 0 would make the channel unbuffered, which is generally
78+
// not desired for actor mailboxes.
79+
mailboxCapacity := cfg.MailboxSize
80+
if mailboxCapacity <= 0 {
81+
// Default to a small capacity if an invalid one is given. This
82+
// could also come from a global constant.
83+
mailboxCapacity = 1
84+
}
85+
86+
// Create mailbox - could be injected via config in the future.
87+
mailbox := NewChannelMailbox[M, R](ctx, mailboxCapacity)
88+
89+
actor := &Actor[M, R]{
90+
id: cfg.ID,
91+
behavior: cfg.Behavior,
92+
mailbox: mailbox,
93+
ctx: ctx,
94+
cancel: cancel,
95+
dlo: cfg.DLO,
96+
}
97+
98+
// Create and cache the actor's own reference.
99+
actor.ref = &actorRefImpl[M, R]{
100+
actor: actor,
101+
}
102+
103+
return actor
104+
}
105+
106+
// Start initiates the actor's message processing loop in a new goroutine. This
107+
// method should be called once after the actor is created.
108+
func (a *Actor[M, R]) Start() {
109+
a.startOnce.Do(func() {
110+
go a.process()
111+
})
112+
}
113+
114+
// process is the main event loop for the actor. It continuously monitors its
115+
// mailbox for incoming messages and its context for cancellation signals.
116+
func (a *Actor[M, R]) process() {
117+
// Use the new iterator pattern for receiving messages.
118+
for env := range a.mailbox.Receive(a.ctx) {
119+
result := a.behavior.Receive(a.ctx, env.message)
120+
121+
// If a promise was provided (i.e., it was an "ask"
122+
// operation), complete the promise with the result from
123+
// the behavior.
124+
if env.promise != nil {
125+
env.promise.Complete(result)
126+
}
127+
}
128+
129+
// Context was cancelled or mailbox closed, drain remaining messages.
130+
a.mailbox.Close()
131+
132+
for env := range a.mailbox.Drain() {
133+
// If a DLO is configured, send the original message there
134+
// for auditing or potential manual reprocessing.
135+
if a.dlo != nil {
136+
a.dlo.Tell(context.Background(), env.message)
137+
}
138+
139+
// If it was an Ask, complete the promise with an error
140+
// indicating the actor terminated.
141+
if env.promise != nil {
142+
env.promise.Complete(fn.Err[R](ErrActorTerminated))
143+
}
144+
}
145+
}
146+
147+
// Stop signals the actor to terminate its processing loop and shut down.
148+
// This is achieved by cancelling the actor's internal context. The actor's
149+
// goroutine will exit once it detects the context cancellation.
150+
func (a *Actor[M, R]) Stop() {
151+
a.stopOnce.Do(func() {
152+
a.cancel()
153+
})
154+
}
155+
156+
// actorRefImpl provides a concrete implementation of the ActorRef interface. It
157+
// holds a reference to the target Actor instance, enabling message sending.
158+
type actorRefImpl[M Message, R any] struct {
159+
actor *Actor[M, R]
160+
}
161+
162+
// Tell sends a message without waiting for a response. If the context is
163+
// cancelled before the message can be sent to the actor's mailbox, the message
164+
// may be dropped.
165+
//
166+
//nolint:lll
167+
func (ref *actorRefImpl[M, R]) Tell(ctx context.Context, msg M) {
168+
// If the actor's own context is already done, don't try to send.
169+
// Route to DLO if available.
170+
if ref.actor.ctx.Err() != nil {
171+
ref.trySendToDLO(msg)
172+
return
173+
}
174+
175+
env := envelope[M, R]{message: msg, promise: nil}
176+
177+
// Use mailbox Send method which internally checks both contexts.
178+
if !ref.actor.mailbox.Send(ctx, env) {
179+
// Failed to send - check if actor terminated.
180+
if ref.actor.ctx.Err() != nil {
181+
ref.trySendToDLO(msg)
182+
}
183+
// Otherwise it was the caller's context that cancelled.
184+
}
185+
}
186+
187+
// Ask sends a message and returns a Future for the response. The Future will be
188+
// completed with the actor's reply or an error if the operation fails (e.g.,
189+
// context cancellation before send).
190+
//
191+
//nolint:lll
192+
func (ref *actorRefImpl[M, R]) Ask(ctx context.Context, msg M) Future[R] {
193+
// Create a new promise that will be fulfilled with the actor's response.
194+
promise := NewPromise[R]()
195+
196+
// If the actor's own context is already done, complete the promise with
197+
// ErrActorTerminated and return immediately. This is the primary guard
198+
// against trying to send to a stopped actor.
199+
if ref.actor.ctx.Err() != nil {
200+
promise.Complete(fn.Err[R](ErrActorTerminated))
201+
return promise.Future()
202+
}
203+
204+
env := envelope[M, R]{message: msg, promise: promise}
205+
206+
// Use mailbox Send method which internally checks both contexts.
207+
if !ref.actor.mailbox.Send(ctx, env) {
208+
// Determine the error based on what failed.
209+
if ref.actor.ctx.Err() != nil {
210+
promise.Complete(fn.Err[R](ErrActorTerminated))
211+
} else {
212+
promise.Complete(fn.Err[R](ctx.Err()))
213+
}
214+
}
215+
216+
// Return the future associated with the promise, allowing the caller to
217+
// await the response.
218+
return promise.Future()
219+
}
220+
221+
// trySendToDLO attempts to send the message to the actor's DLO if configured.
222+
func (ref *actorRefImpl[M, R]) trySendToDLO(msg M) {
223+
if ref.actor.dlo != nil {
224+
// Use context.Background() for sending to DLO as the
225+
// original context might be done or the operation
226+
// should not be bound by it.
227+
// This Tell to DLO is fire-and-forget.
228+
ref.actor.dlo.Tell(context.Background(), msg)
229+
}
230+
}
231+
232+
// ID returns the unique identifier for this actor.
233+
func (ref *actorRefImpl[M, R]) ID() string {
234+
return ref.actor.id
235+
}
236+
237+
// Ref returns an ActorRef for this actor. This allows clients to interact with
238+
// the actor (send messages) without having direct access to the Actor struct
239+
// itself, promoting encapsulation and location transparency.
240+
func (a *Actor[M, R]) Ref() ActorRef[M, R] {
241+
return a.ref
242+
}
243+
244+
// TellRef returns a TellOnlyRef for this actor. This allows clients to send
245+
// messages to the actor using only the "tell" pattern (fire-and-forget),
246+
// without having access to "ask" capabilities.
247+
func (a *Actor[M, R]) TellRef() TellOnlyRef[M] {
248+
return a.ref
249+
}

0 commit comments

Comments
 (0)