@@ -26,6 +26,13 @@ type EventLoop struct {
2626 preparer FirstEventBatchPreparer
2727 eventCh <- chan interface {}
2828 logger logr.Logger
29+
30+ // The EventLoop uses double buffering to handle event batch processing.
31+ // The goroutine that handles the batch will always read from the currentBatch slice.
32+ // While the current batch is being handled, new events are added to the nextBatch slice.
33+ // The batches are swapped before starting the handler goroutine.
34+ currentBatch EventBatch
35+ nextBatch EventBatch
2936}
3037
3138// NewEventLoop creates a new EventLoop.
@@ -36,37 +43,38 @@ func NewEventLoop(
3643 preparer FirstEventBatchPreparer ,
3744) * EventLoop {
3845 return & EventLoop {
39- eventCh : eventCh ,
40- logger : logger ,
41- handler : handler ,
42- preparer : preparer ,
46+ eventCh : eventCh ,
47+ logger : logger ,
48+ handler : handler ,
49+ preparer : preparer ,
50+ currentBatch : make (EventBatch , 0 ),
51+ nextBatch : make (EventBatch , 0 ),
4352 }
4453}
4554
4655// Start starts the EventLoop.
4756// This method will block until the EventLoop stops, which will happen after the ctx is closed.
4857func (el * EventLoop ) Start (ctx context.Context ) error {
49- // The current batch.
50- var batch EventBatch
5158 // handling tells if any batch is currently being handled.
5259 var handling bool
5360 // handlingDone is used to signal the completion of handling a batch.
5461 handlingDone := make (chan struct {})
5562
56- handleAndResetBatch := func () {
63+ handleBatch := func () {
5764 go func (batch EventBatch ) {
5865 el .logger .Info ("Handling events from the batch" , "total" , len (batch ))
5966
6067 el .handler .HandleEventBatch (ctx , batch )
6168
6269 el .logger .Info ("Finished handling the batch" )
6370 handlingDone <- struct {}{}
64- }(batch )
71+ }(el .currentBatch )
72+ }
6573
66- // FIXME(pleshakov): Making an entirely new buffer is inefficient and multiplies memory operations.
67- // Use a double-buffer approach - create two buffers and exchange them between the producer and consumer
68- // routines. NOTE: pass-by-reference, and reset buffer to length 0, but retain capacity.
69- batch = make ([] interface {}, 0 )
74+ swapAndHandleBatch := func () {
75+ el . swapBatches ()
76+ handleBatch ()
77+ handling = true
7078 }
7179
7280 // Prepare the fist event batch, which includes the UpsertEvents for all relevant cluster resources.
@@ -81,13 +89,13 @@ func (el *EventLoop) Start(ctx context.Context) error {
8189 // not trigger any reconfiguration after receiving an upsert for an existing resource with the same Generation.
8290
8391 var err error
84- batch , err = el .preparer .Prepare (ctx )
92+ el . currentBatch , err = el .preparer .Prepare (ctx )
8593 if err != nil {
8694 return fmt .Errorf ("failed to prepare the first batch: %w" , err )
8795 }
8896
8997 // Handle the first batch
90- handleAndResetBatch ()
98+ handleBatch ()
9199 handling = true
92100
93101 // Note: at any point of time, no more than one batch is currently being handled.
@@ -103,28 +111,32 @@ func (el *EventLoop) Start(ctx context.Context) error {
103111 return nil
104112 case e := <- el .eventCh :
105113 // Add the event to the current batch.
106- batch = append (batch , e )
114+ el . nextBatch = append (el . nextBatch , e )
107115
108116 // FIXME(pleshakov): Log more details about the event like resource GVK and ns/name.
109117 el .logger .Info (
110- "added an event to the current batch" ,
118+ "added an event to the next batch" ,
111119 "type" , fmt .Sprintf ("%T" , e ),
112- "total" , len (batch ),
120+ "total" , len (el . nextBatch ),
113121 )
114122
115- // Handle the current batch if no batch is being handled.
123+ // If no batch is currently being handled, swap batches and begin handling the batch .
116124 if ! handling {
117- handleAndResetBatch ()
118- handling = true
125+ swapAndHandleBatch ()
119126 }
120127 case <- handlingDone :
121128 handling = false
122129
123- // Handle the current batch if it has at least one event.
124- if len (batch ) > 0 {
125- handleAndResetBatch ()
126- handling = true
130+ // If there's at least one event in the next batch, swap batches and begin handling the batch.
131+ if len (el .nextBatch ) > 0 {
132+ swapAndHandleBatch ()
127133 }
128134 }
129135 }
130136}
137+
138+ // swapBatches swaps the current and next batches.
139+ func (el * EventLoop ) swapBatches () {
140+ el .currentBatch , el .nextBatch = el .nextBatch , el .currentBatch
141+ el .nextBatch = el .nextBatch [:0 ]
142+ }
0 commit comments