Skip to content

Commit 0be738e

Browse files
committed
Added max_retries and failed_items_path() configs
Updated buffer_flush and upload()
1 parent 5941adf commit 0be738e

File tree

3 files changed

+88
-142
lines changed

3 files changed

+88
-142
lines changed

lib/logstash/outputs/kusto.rb

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,17 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
7575
# Maximum interval (in seconds) before the buffer gets flushed, defaults to 10
7676
config :max_interval, validate: :number, default: 10
7777

78+
# Maximum number of retries before the flush fails, defaults to 3
79+
config :max_retries, validate: :number, default: 3
80+
81+
# Path to store failed items, defaults to nil
82+
config :failed_items_path, validate: :string, default: nil
83+
7884
default :codec, 'json_lines'
7985

8086
def register
8187
# Initialize the custom buffer with size and interval
82-
@buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval) do |events|
88+
@buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval, @max_retries, @failed_items_path) do |events|
8389
flush_buffer(events)
8490
end
8591

lib/logstash/outputs/kusto/custom_size_based_buffer.rb

Lines changed: 67 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -1,179 +1,143 @@
11
require 'logger'
22
require 'thread'
3-
require 'fileutils'
4-
require 'securerandom'
3+
require 'csv'
54

65
module LogStash
76
module Outputs
87
class CustomSizeBasedBuffer
9-
def initialize(max_size_mb, max_interval, &flush_callback)
8+
def initialize(max_size_mb = 10, max_interval = 10, max_retries = 3, failed_items_path = nil, &flush_callback)
109
@buffer_config = {
1110
max_size: max_size_mb * 1024 * 1024, # Convert MB to bytes
1211
max_interval: max_interval,
13-
buffer_dir: './tmp/buffer_storage/',
12+
max_retries: max_retries,
13+
failed_items_path: failed_items_path,
1414
logger: Logger.new(STDOUT)
1515
}
1616
@buffer_state = {
1717
pending_items: [],
1818
pending_size: 0,
1919
last_flush: Time.now.to_i,
20-
timer: nil,
21-
network_down: false
22-
}
23-
@flush_callback = flush_callback
24-
@shutdown = false
25-
@pending_mutex = Mutex.new
26-
@flush_mutex = Mutex.new
27-
load_buffer_from_files
28-
@buffer_config[:logger].info("CustomSizeBasedBuffer initialized with max_size: #{max_size_mb} MB, max_interval: #{max_interval} seconds")
29-
30-
# Start the timer thread after a delay to ensure initializations are completed
31-
Thread.new do
32-
sleep(10)
33-
@buffer_state[:timer] = Thread.new do
20+
timer: Thread.new do
3421
loop do
3522
sleep(@buffer_config[:max_interval])
3623
buffer_flush(force: true)
3724
end
3825
end
39-
end
26+
}
27+
@flush_callback = flush_callback
28+
@pending_mutex = Mutex.new
29+
@flush_mutex = Mutex.new
30+
@buffer_config[:logger].info("CustomSizeBasedBuffer initialized with max_size: #{max_size_mb} MB, max_interval: #{max_interval} seconds, max_retries: #{max_retries}, failed_items_path: #{failed_items_path}")
4031
end
4132

42-
def <<(event)
43-
while buffer_full? do
44-
sleep 0.1
45-
end
33+
def <<(event)
34+
while buffer_full? do
35+
sleep 0.1
36+
end
4637

4738
@pending_mutex.synchronize do
4839
@buffer_state[:pending_items] << event
4940
@buffer_state[:pending_size] += event.bytesize
5041
end
42+
43+
# Trigger a flush if the buffer size exceeds the maximum size
44+
if buffer_full?
45+
buffer_flush(force: true)
46+
end
5147
end
5248

5349
def shutdown
5450
@buffer_config[:logger].info("Shutting down buffer")
55-
@shutdown = true
5651
@buffer_state[:timer].kill
5752
buffer_flush(final: true)
58-
clear_buffer_files
5953
end
6054

61-
private
55+
private
6256

63-
def buffer_full?
64-
@pending_mutex.synchronize do
65-
@buffer_state[:pending_size] >= @buffer_config[:max_size]
66-
end
67-
end
68-
69-
def buffer_flush(options = {})
70-
force = options[:force] || options[:final]
71-
final = options[:final]
57+
def buffer_full?
58+
@pending_mutex.synchronize do
59+
@buffer_state[:pending_size] >= @buffer_config[:max_size]
60+
end
61+
end
7262

73-
if final
74-
@flush_mutex.lock
75-
elsif !@flush_mutex.try_lock
76-
return 0
77-
end
63+
def buffer_flush(options = {})
64+
force = options[:force] || options[:final]
65+
final = options[:final]
7866

79-
items_flushed = 0
67+
if final
68+
@flush_mutex.lock
69+
elsif !@flush_mutex.try_lock
70+
return 0
71+
end
8072

8173
begin
8274
outgoing_items = []
8375
outgoing_size = 0
8476

8577
@pending_mutex.synchronize do
8678
return 0 if @buffer_state[:pending_size] == 0
87-
88-
time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush]
79+
time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush]
8980

9081
if !force && @buffer_state[:pending_size] < @buffer_config[:max_size] && time_since_last_flush < @buffer_config[:max_interval]
9182
return 0
9283
end
9384

9485
if force
95-
@buffer_config[:logger].info("Time-based flush triggered after #{@buffer_config[:max_interval]} seconds")
96-
elsif @buffer_state[:pending_size] >= @buffer_config[:max_size]
97-
@buffer_config[:logger].info("Size-based flush triggered at #{@buffer_state[:pending_size]} bytes was reached")
98-
else
99-
@buffer_config[:logger].info("Flush triggered without specific condition")
86+
if time_since_last_flush >= @buffer_config[:max_interval]
87+
@buffer_config[:logger].info("Time-based flush triggered after #{@buffer_config[:max_interval]} seconds")
88+
else
89+
@buffer_config[:logger].info("Size-based flush triggered when #{@buffer_state[:pending_size]} bytes was reached")
90+
end
10091
end
10192

10293
outgoing_items = @buffer_state[:pending_items].dup
10394
outgoing_size = @buffer_state[:pending_size]
104-
@buffer_state[:pending_items] = []
105-
@buffer_state[:pending_size] = 0
95+
buffer_initialize
10696
end
10797

98+
retries = 0
10899
begin
100+
@buffer_config[:logger].info("Flushing: #{outgoing_items.size} items and #{outgoing_size} bytes to the network")
109101
@flush_callback.call(outgoing_items) # Pass the list of events to the callback
110-
@buffer_state[:network_down] = false # Reset network status after successful flush
111-
flush_buffer_files # Flush buffer files if any exist
102+
@buffer_state[:last_flush] = Time.now.to_i
103+
@buffer_config[:logger].info("Flush completed. Flushed #{outgoing_items.size} events, #{outgoing_size} bytes")
112104
rescue => e
113-
@buffer_config[:logger].error("Flush failed: #{e.message}")
114-
@buffer_state[:network_down] = true
115-
save_buffer_to_file(outgoing_items)
105+
retries += 1
106+
if retries <= @buffer_config[:max_retries]
107+
@buffer_config[:logger].error("Flush failed: #{e.message}. \nRetrying (#{retries}/#{@buffer_config[:max_retries]})...")
108+
sleep 1
109+
retry
110+
else
111+
@buffer_config[:logger].error("Max retries reached. Failed to flush #{outgoing_items.size} items and #{outgoing_size} bytes")
112+
handle_failed_flush(outgoing_items, e.message)
113+
end
116114
end
117115

118-
@buffer_state[:last_flush] = Time.now.to_i
119-
@buffer_config[:logger].info("Flush completed. Flushed #{outgoing_items.size} events, #{outgoing_size} bytes")
120-
121-
items_flushed = outgoing_items.size
122116
ensure
123117
@flush_mutex.unlock
124118
end
125-
126-
items_flushed
127-
end
128-
129-
def save_buffer_to_file(events)
130-
buffer_state_copy = {
131-
pending_items: events,
132-
pending_size: events.sum(&:bytesize)
133-
}
134-
135-
::FileUtils.mkdir_p(@buffer_config[:buffer_dir]) # Ensure directory exists
136-
file_path = ::File.join(@buffer_config[:buffer_dir], "buffer_state_#{Time.now.to_i}_#{SecureRandom.uuid}.log")
137-
::File.open(file_path, 'w') do |file|
138-
file.write(Marshal.dump(buffer_state_copy))
139-
end
140-
@buffer_config[:logger].info("Saved buffer state to file: #{file_path}")
141119
end
142120

143-
def load_buffer_from_files
144-
Dir.glob(::File.join(@buffer_config[:buffer_dir], 'buffer_state_*.log')).each do |file_path|
121+
def handle_failed_flush(items, error_message)
122+
if @buffer_config[:failed_items_path]
145123
begin
146-
buffer_state = Marshal.load(::File.read(file_path))
147-
@buffer_state[:pending_items].concat(buffer_state[:pending_items])
148-
@buffer_state[:pending_size] += buffer_state[:pending_size]
149-
::File.delete(file_path)
150-
rescue => e
151-
@buffer_config[:logger].error("Failed to load buffer from file: #{e.message}")
152-
end
153-
end
154-
@buffer_config[:logger].info("Loaded buffer state from files")
155-
end
156-
157-
def flush_buffer_files
158-
Dir.glob(::File.join(@buffer_config[:buffer_dir], 'buffer_state_*.log')).each do |file_path|
159-
begin
160-
buffer_state = Marshal.load(::File.read(file_path))
161-
@buffer_config[:logger].info("Flushed from file: #{file_path}")
162-
@flush_callback.call(buffer_state[:pending_items])
163-
::File.delete(file_path)
164-
@buffer_config[:logger].info("Flushed and deleted buffer state file: #{file_path}")
124+
::File.open(@buffer_config[:failed_items_path], 'a') do |file|
125+
items.each do |item|
126+
file.puts(item)
127+
end
128+
end
129+
@buffer_config[:logger].info("Failed items stored in #{@buffer_config[:failed_items_path]}")
165130
rescue => e
166-
@buffer_config[:logger].error("Failed to flush buffer state file: #{e.message}")
167-
break
131+
@buffer_config[:logger].error("Failed to store items: #{e.message}")
168132
end
133+
else
134+
@buffer_config[:logger].warn("No failed_items_path configured. Data loss may occur.")
169135
end
170136
end
171137

172-
def clear_buffer_files
173-
Dir.glob(::File.join(@buffer_config[:buffer_dir], 'buffer_state_*.log')).each do |file_path|
174-
::File.delete(file_path)
175-
end
176-
@buffer_config[:logger].info("Cleared all buffer state files")
138+
def buffer_initialize
139+
@buffer_state[:pending_items] = []
140+
@buffer_state[:pending_size] = 0
177141
end
178142
end
179143
end

lib/logstash/outputs/kusto/ingestor.rb

Lines changed: 14 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ def initialize(kusto_logstash_configuration, logger, threadpool = DEFAULT_THREAD
7575
end
7676

7777
@ingestion_properties = kusto_java.ingest.IngestionProperties.new(@kusto_logstash_configuration.kusto_ingest.database, @kusto_logstash_configuration.kusto_ingest.table)
78-
78+
@ingestion_properties.setReportLevel(Java::ComMicrosoftAzureKustoIngest::IngestionProperties::IngestionReportLevel::FAILURES_AND_SUCCESSES)
79+
@ingestion_properties.setReportMethod(Java::ComMicrosoftAzureKustoIngest::IngestionProperties::IngestionReportMethod::TABLE)
7980
if @kusto_logstash_configuration.kusto_ingest.is_mapping_ref_provided
8081
@logger.debug('Using mapping reference.', @kusto_logstash_configuration.kusto_ingest.json_mapping)
8182
@ingestion_properties.setIngestionMapping(@kusto_logstash_configuration.kusto_ingest.json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON)
@@ -101,59 +102,34 @@ def upload_async(data)
101102
exception = e
102103
end
103104
end
104-
105105
# Wait for the task to complete and check for exceptions
106106
@workers_pool.shutdown
107107
@workers_pool.wait_for_termination
108-
109-
if exception
110-
@logger.error('StandardError in upload_async.', exception: exception.class, message: exception.message, backtrace: exception.backtrace)
111-
raise exception
112-
end
108+
109+
raise exception if exception
113110
rescue Exception => e
114111
@logger.error('StandardError in upload_async.', exception: e.class, message: e.message, backtrace: e.backtrace)
115112
raise e
116113
end
117114

118115
def upload(data)
119-
@logger.info("Sending data to Kusto")
120-
116+
@logger.debug("Sending data to Kusto")
121117
if data.size > 0
122-
ingestionLatch = java.util.concurrent.CountDownLatch.new(1)
123-
124-
Thread.new do
125-
begin
126-
data_source_info = Java::com.microsoft.azure.kusto.ingest.source.StreamSourceInfo.new(java.io.ByteArrayInputStream.new(data.to_java_bytes))
127-
ingestion_result = @kusto_client.ingestFromStream(data_source_info, @ingestion_properties)
128-
129-
# Check the ingestion status
130-
status = ingestion_result.getIngestionStatusCollection.get(0)
131-
if status.status != Java::com.microsoft.azure.kusto.ingest.result.OperationStatus::Queued
132-
raise "Failed upload: #{status.errorCodeString}"
133-
end
134-
@logger.info("Final ingestion status: #{status.status}")
135-
rescue => e
136-
@logger.error('Error during ingestFromStream.', exception: e.class, message: e.message, backtrace: e.backtrace)
137-
if e.message.include?("network")
138-
raise e
139-
end
140-
ensure
141-
ingestionLatch.countDown()
142-
end
143-
end
144-
145-
# Wait for the ingestion to complete with a timeout
146-
if !ingestionLatch.await(30, java.util.concurrent.TimeUnit::SECONDS)
147-
@logger.error('Ingestion timed out, possible network issue.')
148-
raise 'Ingestion timed out, possible network issue.'
118+
begin
119+
data_source_info = Java::com.microsoft.azure.kusto.ingest.source.StreamSourceInfo.new(java.io.ByteArrayInputStream.new(data.to_java_bytes))
120+
result = @kusto_client.ingestFromStream(data_source_info, @ingestion_properties)
121+
rescue => e
122+
@logger.error('Uploading failed.', exception: e.class, message: e.message, backtrace: e.backtrace)
123+
raise e
149124
end
150125
else
151126
@logger.warn("Data is empty and is not ingested.")
152127
end
153-
@logger.info("Data sent to Kusto.")
128+
129+
@logger.debug("Data sent to Kusto.")
154130
rescue => e
155131
@logger.error('Uploading failed.', exception: e.class, message: e.message, backtrace: e.backtrace)
156-
raise e # Raise the original error if ingestion fails
132+
raise e
157133
end
158134

159135
def stop

0 commit comments

Comments
 (0)