@@ -42,64 +42,70 @@ local schema = {
4242}
4343
4444
45- local function schedule_func_exec (batch_processor , delay , batch )
46- local hdl , err = timer_at (delay , execute_func , batch_processor , batch )
45+ local function schedule_func_exec (self , delay , batch )
46+ local hdl , err = timer_at (delay , execute_func , self , batch )
4747 if not hdl then
4848 core .log .error (" failed to create process timer: " , err )
4949 return
5050 end
5151end
5252
5353
54- function execute_func (premature , batch_processor , batch )
54+ function execute_func (premature , self , batch )
5555 if premature then
5656 return
5757 end
5858
59- local ok , err = batch_processor .func (batch .entries , batch_processor .batch_max_size )
59+ local ok , err = self .func (batch .entries , self .batch_max_size )
6060 if not ok then
61- core .log .error (" Batch Processor[" , batch_processor .name , " ] failed to process entries: " , err )
61+ core .log .error (" Batch Processor[" , self .name ,
62+ " ] failed to process entries: " , err )
6263 batch .retry_count = batch .retry_count + 1
63- if batch .retry_count <= batch_processor .max_retry_count then
64- schedule_func_exec (batch_processor , batch_processor .retry_delay , batch )
64+ if batch .retry_count <= self .max_retry_count then
65+ schedule_func_exec (self , self .retry_delay ,
66+ batch )
6567 else
66- core .log .error (" Batch Processor[" , batch_processor .name ," ] exceeded " ,
67- " the max_retry_count[" , batch .retry_count ," ] dropping the entries" )
68+ core .log .error (" Batch Processor[" , self .name ," ] exceeded " ,
69+ " the max_retry_count[" , batch .retry_count ,
70+ " ] dropping the entries" )
6871 end
6972 return
7073 end
7174
72- core .log .debug (" Batch Processor[" , batch_processor .name ," ] successfully processed the entries" )
75+ core .log .debug (" Batch Processor[" , self .name ,
76+ " ] successfully processed the entries" )
7377end
7478
7579
76- local function flush_buffer (premature , batch_processor )
80+ local function flush_buffer (premature , self )
7781 if premature then
7882 return
7983 end
8084
81- if now () - batch_processor .last_entry_t >= batch_processor .inactive_timeout or
82- now () - batch_processor .first_entry_t >= batch_processor .buffer_duration then
83- core .log .debug (" Batch Processor[" , batch_processor .name ," ] buffer " ,
85+ if now () - self .last_entry_t >= self .inactive_timeout or
86+ now () - self .first_entry_t >= self .buffer_duration
87+ then
88+ core .log .debug (" Batch Processor[" , self .name ," ] buffer " ,
8489 " duration exceeded, activating buffer flush" )
85- batch_processor :process_buffer ()
86- batch_processor .is_timer_running = false
90+ self :process_buffer ()
91+ self .is_timer_running = false
8792 return
8893 end
8994
90- -- buffer duration did not exceed or the buffer is active, extending the timer
91- core .log .debug (" Batch Processor[" , batch_processor .name ," ] extending buffer timer" )
92- create_buffer_timer (batch_processor )
95+ -- buffer duration did not exceed or the buffer is active,
96+ -- extending the timer
97+ core .log .debug (" Batch Processor[" , self .name ," ] extending buffer timer" )
98+ create_buffer_timer (self )
9399end
94100
95101
96- function create_buffer_timer (batch_processor )
97- local hdl , err = timer_at (batch_processor .inactive_timeout , flush_buffer , batch_processor )
102+ function create_buffer_timer (self )
103+ local hdl , err = timer_at (self .inactive_timeout , flush_buffer , self )
98104 if not hdl then
99105 core .log .error (" failed to create buffer timer: " , err )
100106 return
101107 end
102- batch_processor .is_timer_running = true
108+ self .is_timer_running = true
103109end
104110
105111
@@ -149,7 +155,8 @@ function Batch_Processor:push(entry)
149155 self .last_entry_t = now ()
150156
151157 if self .batch_max_size <= # entries then
152- core .log .debug (" Batch Processor[" , self .name ," ] batch max size has exceeded" )
158+ core .log .debug (" Batch Processor[" , self .name ,
159+ " ] batch max size has exceeded" )
153160 self :process_buffer ()
154161 end
155162
0 commit comments