From 5941adfbb90a024de3a8480a063ed0e023cc0da9 Mon Sep 17 00:00:00 2001 From: Ramachandran G Date: Wed, 16 Oct 2024 17:46:03 +0530 Subject: [PATCH 1/8] * Refactor to classes --- lib/logstash/outputs/kusto.rb | 18 +- .../outputs/kusto/custom_size_based_buffer.rb | 42 ++--- lib/logstash/outputs/kusto/ingestor.rb | 73 +++------ lib/logstash/outputs/kusto/interval.rb | 126 +++++++------- .../kusto/kustoLogstashConfiguration.rb | 155 ++++++++++++++++++ spec/outputs/kusto/ingestor_spec.rb | 97 ----------- spec/outputs/kusto/interval_spec.rb | 69 ++++++++ .../kusto/kustoLogstashConfiguration_spec.rb | 106 ++++++++++++ 8 files changed, 444 insertions(+), 242 deletions(-) create mode 100644 lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb delete mode 100755 spec/outputs/kusto/ingestor_spec.rb create mode 100644 spec/outputs/kusto/interval_spec.rb create mode 100755 spec/outputs/kusto/kustoLogstashConfiguration_spec.rb diff --git a/lib/logstash/outputs/kusto.rb b/lib/logstash/outputs/kusto.rb index 6fa160b4..18a1d501 100755 --- a/lib/logstash/outputs/kusto.rb +++ b/lib/logstash/outputs/kusto.rb @@ -7,6 +7,7 @@ require 'logstash/outputs/kusto/ingestor' require 'logstash/outputs/kusto/interval' require 'logstash/outputs/kusto/custom_size_based_buffer' +require 'logstash/outputs/kusto/kustoLogstashConfiguration' ## # This plugin sends messages to Azure Kusto in batches. @@ -91,13 +92,13 @@ def register max_threads: upload_concurrent_count, max_queue: upload_queue_size, fallback_policy: :caller_runs) - - @ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cli_auth, database, table, final_mapping, proxy_host, proxy_port, proxy_protocol, @logger, executor) - - # Deprecation warning for path - if @path - @logger.warn("The 'path' configuration option is deprecated and will be removed in a future release.") - end + + kusto_ingest_base = LogStash::Outputs::KustoInternal::KustoIngestConfiguration.new(ingest_url, database, table, final_mapping) + kusto_auth_base = LogStash::Outputs::KustoInternal::KustoAuthConfiguration.new(app_id, app_key, app_tenant, managed_identity, cli_auth) + kusto_proxy_base = LogStash::Outputs::KustoInternal::KustoProxyConfiguration.new(proxy_host , proxy_port , proxy_protocol, false) + @kusto_logstash_configuration = LogStash::Outputs::KustoInternal::KustoLogstashConfiguration.new(kusto_ingest_base, kusto_auth_base , kusto_proxy_base, logger) + @ingestor = Ingestor.new(@kusto_logstash_configuration, @logger, executor) + end @@ -114,8 +115,7 @@ def multi_receive_encoded(events_and_encoded) def close @logger.info("Closing Kusto output plugin") - - begin + begin @buffer.shutdown unless @buffer.nil? @logger.info("Buffer shutdown") unless @buffer.nil? rescue => e diff --git a/lib/logstash/outputs/kusto/custom_size_based_buffer.rb b/lib/logstash/outputs/kusto/custom_size_based_buffer.rb index 7c680266..6a012276 100644 --- a/lib/logstash/outputs/kusto/custom_size_based_buffer.rb +++ b/lib/logstash/outputs/kusto/custom_size_based_buffer.rb @@ -39,10 +39,10 @@ def initialize(max_size_mb, max_interval, &flush_callback) end end - def <<(event) - while buffer_full? do - sleep 0.1 - end + def <<(event) + while buffer_full? do + sleep 0.1 + end @pending_mutex.synchronize do @buffer_state[:pending_items] << event @@ -58,23 +58,23 @@ def shutdown clear_buffer_files end - private + private - def buffer_full? - @pending_mutex.synchronize do - @buffer_state[:pending_size] >= @buffer_config[:max_size] - end - end + def buffer_full? + @pending_mutex.synchronize do + @buffer_state[:pending_size] >= @buffer_config[:max_size] + end + end - def buffer_flush(options = {}) - force = options[:force] || options[:final] - final = options[:final] + def buffer_flush(options = {}) + force = options[:force] || options[:final] + final = options[:final] - if final - @flush_mutex.lock - elsif !@flush_mutex.try_lock - return 0 - end + if final + @flush_mutex.lock + elsif !@flush_mutex.try_lock + return 0 + end items_flushed = 0 @@ -85,7 +85,7 @@ def buffer_flush(options = {}) @pending_mutex.synchronize do return 0 if @buffer_state[:pending_size] == 0 - time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush] + time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush] if !force && @buffer_state[:pending_size] < @buffer_config[:max_size] && time_since_last_flush < @buffer_config[:max_interval] return 0 @@ -123,8 +123,8 @@ def buffer_flush(options = {}) @flush_mutex.unlock end - items_flushed - end + items_flushed + end def save_buffer_to_file(events) buffer_state_copy = { diff --git a/lib/logstash/outputs/kusto/ingestor.rb b/lib/logstash/outputs/kusto/ingestor.rb index e991d5a5..392cb0a1 100755 --- a/lib/logstash/outputs/kusto/ingestor.rb +++ b/lib/logstash/outputs/kusto/ingestor.rb @@ -20,38 +20,39 @@ class Ingestor LOW_QUEUE_LENGTH = 3 FIELD_REF = /%\{[^}]+\}/ - def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli_auth, database, table, json_mapping, proxy_host , proxy_port , proxy_protocol,logger, threadpool = DEFAULT_THREADPOOL) + def initialize(kusto_logstash_configuration, logger, threadpool = DEFAULT_THREADPOOL) @workers_pool = threadpool @logger = logger - validate_config(database, table, json_mapping,proxy_protocol,app_id, app_key, managed_identity_id,cli_auth) + #Validate and assign + kusto_logstash_configuration.validate_config() + @kusto_logstash_configuration = kusto_logstash_configuration + @logger.info('Preparing Kusto resources.') kusto_java = Java::com.microsoft.azure.kusto apache_http = Java::org.apache.http - # kusto_connection_string = kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant) - # If there is managed identity, use it. This means the AppId and AppKey are empty/nil - # If there is CLI Auth, use that instead of managed identity - is_managed_identity = (app_id.nil? && app_key.nil? && !cli_auth) + + is_managed_identity = @kusto_logstash_configuration.kusto_auth.is_managed_identity # If it is system managed identity, propagate the system identity - is_system_assigned_managed_identity = is_managed_identity && 0 == "system".casecmp(managed_identity_id) + is_system_assigned_managed_identity = @kusto_logstash_configuration.kusto_auth.is_system_assigned_managed_identity # Is it direct connection - is_direct_conn = (proxy_host.nil? || proxy_host.empty?) + is_direct_conn = @kusto_logstash_configuration.kusto_proxy.is_direct_conn # Create a connection string kusto_connection_string = if is_managed_identity if is_system_assigned_managed_identity @logger.info('Using system managed identity.') - kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url) + kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(@kusto_logstash_configuration.kusto_ingest.ingest_url) else @logger.info('Using user managed identity.') - kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url, managed_identity_id) + kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(@kusto_logstash_configuration.kusto_ingest.ingest_url, @kusto_logstash_configuration.kusto_ingest.managed_identity_id) end else - if cli_auth + if @kusto_logstash_configuration.kusto_auth.cli_auth @logger.warn('*Use of CLI Auth is only for dev-test scenarios. This is ***NOT RECOMMENDED*** for production*') - kusto_java.data.auth.ConnectionStringBuilder.createWithAzureCli(ingest_url) + kusto_java.data.auth.ConnectionStringBuilder.createWithAzureCli(@kusto_logstash_configuration.kusto_ingest.ingest_url) else @logger.info('Using app id and app key.') - kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant) + kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(@kusto_logstash_configuration.kusto_ingest.ingest_url, @kusto_logstash_configuration.kusto_auth.app_id, @kusto_logstash_configuration.kusto_auth.app_key.value, @kusto_logstash_configuration.kusto_auth.app_tenant) end end @logger.debug(Gem.loaded_specs.to_s) @@ -62,22 +63,22 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli tuple_utils = Java::org.apache.commons.lang3.tuple # kusto_connection_string.setClientVersionForTracing(name_for_tracing) version_for_tracing=Gem.loaded_specs['logstash-output-kusto']&.version || "unknown" - kusto_connection_string.setConnectorDetails("Logstash",version_for_tracing.to_s,"","",false,"", tuple_utils.Pair.emptyArray()); + kusto_connection_string.setConnectorDetails("Logstash",version_for_tracing.to_s,name_for_tracing.to_s,version_for_tracing.to_s,false,"", tuple_utils.Pair.emptyArray()); @kusto_client = begin if is_direct_conn kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string) else - kusto_http_client_properties = kusto_java.data.HttpClientProperties.builder().proxy(apache_http.HttpHost.new(proxy_host,proxy_port,proxy_protocol)).build() + kusto_http_client_properties = kusto_java.data.HttpClientProperties.builder().proxy(apache_http.HttpHost.new(@kusto_logstash_configuration.kusto_proxy.proxy_host,@kusto_logstash_configuration.kusto_proxy.proxy_port,@kusto_logstash_configuration.kusto_proxy.proxy_protocol)).build() kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string, kusto_http_client_properties) end end - @ingestion_properties = kusto_java.ingest.IngestionProperties.new(database, table) - is_mapping_ref_provided = !(json_mapping.nil? || json_mapping.empty?) - if is_mapping_ref_provided - @logger.debug('Using mapping reference.', json_mapping) - @ingestion_properties.setIngestionMapping(json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON) + @ingestion_properties = kusto_java.ingest.IngestionProperties.new(@kusto_logstash_configuration.kusto_ingest.database, @kusto_logstash_configuration.kusto_ingest.table) + + if @kusto_logstash_configuration.kusto_ingest.is_mapping_ref_provided + @logger.debug('Using mapping reference.', @kusto_logstash_configuration.kusto_ingest.json_mapping) + @ingestion_properties.setIngestionMapping(@kusto_logstash_configuration.kusto_ingest.json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON) @ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON) else @logger.debug('No mapping reference provided. Columns will be mapped by names in the logstash output') @@ -86,38 +87,6 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli @logger.debug('Kusto resources are ready.') end - def validate_config(database, table, json_mapping, proxy_protocol, app_id, app_key, managed_identity_id,cli_auth) - # Add an additional validation and fail this upfront - if app_id.nil? && app_key.nil? && managed_identity_id.nil? - if cli_auth - @logger.info('Using CLI Auth, this is only for dev-test scenarios. This is ***NOT RECOMMENDED*** for production') - else - @logger.error('managed_identity_id is not provided and app_id/app_key is empty.') - raise LogStash::ConfigurationError.new('managed_identity_id is not provided and app_id/app_key is empty.') - end - end - if database =~ FIELD_REF - @logger.error('database config value should not be dynamic.', database) - raise LogStash::ConfigurationError.new('database config value should not be dynamic.') - end - - if table =~ FIELD_REF - @logger.error('table config value should not be dynamic.', table) - raise LogStash::ConfigurationError.new('table config value should not be dynamic.') - end - - if json_mapping =~ FIELD_REF - @logger.error('json_mapping config value should not be dynamic.', json_mapping) - raise LogStash::ConfigurationError.new('json_mapping config value should not be dynamic.') - end - - if not(["https", "http"].include? proxy_protocol) - @logger.error('proxy_protocol has to be http or https.', proxy_protocol) - raise LogStash::ConfigurationError.new('proxy_protocol has to be http or https.') - end - - end - def upload_async(data) if @workers_pool.remaining_capacity <= LOW_QUEUE_LENGTH @logger.warn("Ingestor queue capacity is running low with #{@workers_pool.remaining_capacity} free slots.") diff --git a/lib/logstash/outputs/kusto/interval.rb b/lib/logstash/outputs/kusto/interval.rb index 33046309..6ba8d4a4 100755 --- a/lib/logstash/outputs/kusto/interval.rb +++ b/lib/logstash/outputs/kusto/interval.rb @@ -5,77 +5,77 @@ require 'logstash/errors' class LogStash::Outputs::Kusto < LogStash::Outputs::Base - ## - # Bare-bones utility for running a block of code at an interval. - # - class Interval - ## - # Initializes a new Interval with the given arguments and starts it - # before returning it. - # - # @param interval [Integer] (see: Interval#initialize) - # @param procsy [#call] (see: Interval#initialize) - # - # @return [Interval] - # - def self.start(interval, procsy) - new(interval, procsy).tap(&:start) - end + ## + # Bare-bones utility for running a block of code at an interval. + # + class Interval + ## + # Initializes a new Interval with the given arguments and starts it + # before returning it. + # + # @param interval [Integer] (see: Interval#initialize) + # @param procsy [#call] (see: Interval#initialize) + # + # @return [Interval] + # + def self.start(interval, procsy) + new(interval, procsy).tap(&:start) + end - ## - # @param interval [Integer]: time in seconds to wait between calling the given proc - # @param procsy [#call]: proc or lambda to call periodically; must not raise exceptions. - def initialize(interval, procsy) - @interval = interval - @procsy = procsy + ## + # @param interval [Integer]: time in seconds to wait between calling the given proc + # @param procsy [#call]: proc or lambda to call periodically; must not raise exceptions. + def initialize(interval, procsy) + @interval = interval + @procsy = procsy - # Mutex, ConditionVariable, etc. - @mutex = Mutex.new - @sleeper = ConditionVariable.new - end + # Mutex, ConditionVariable, etc. + @mutex = Mutex.new + @sleeper = ConditionVariable.new + end - ## - # Starts the interval, or returns if it has already been started. - # - # @return [void] - def start - @mutex.synchronize do - return if @thread && @thread.alive? + ## + # Starts the interval, or returns if it has already been started. + # + # @return [void] + def start + @mutex.synchronize do + return if @thread && @thread.alive? - @thread = Thread.new { run } - end - end + @thread = Thread.new { run } + end + end - ## - # Stop the interval. - # Does not interrupt if execution is in-progress. - def stop - @mutex.synchronize do - @stopped = true - end + ## + # Stop the interval. + # Does not interrupt if execution is in-progress. + def stop + @mutex.synchronize do + @stopped = true + end - @thread && @thread.join - end + @thread && @thread.join + end - ## - # @return [Boolean] - def alive? - @thread && @thread.alive? - end + ## + # @return [Boolean] + def alive? + @thread && @thread.alive? + end - private + private - def run - @mutex.synchronize do - loop do - @sleeper.wait(@mutex, @interval) - break if @stopped + def run + @mutex.synchronize do + loop do + @sleeper.wait(@mutex, @interval) + break if @stopped - @procsy.call - end - end - ensure - @sleeper.broadcast - end - end + @procsy.call + end + end + ensure + @sleeper.broadcast + end + end end diff --git a/lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb b/lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb new file mode 100644 index 00000000..0aecb874 --- /dev/null +++ b/lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb @@ -0,0 +1,155 @@ +# encoding: utf-8 +# A class just having all the configurations wrapped into a seperate object +module LogStash + module Outputs + module KustoInternal + class KustoLogstashConfiguration + FIELD_REF = /%\{[^}]+\}/ + def initialize(kusto_ingest,kusto_auth, kusto_proxy, logger) + @logger = logger + @kusto_ingest = kusto_ingest + @kusto_auth = kusto_auth + @kusto_proxy = kusto_proxy + @logger.info("Kusto configuration initialized.") + end # def initialize + + # Configuration + def kusto_ingest + @kusto_ingest + end + def kusto_auth + @kusto_auth + end + def kusto_proxy + @kusto_proxy + end + + def validate_config() + # Add an additional validation and fail this upfront + if @kusto_auth.app_id.to_s.empty? && @kusto_auth.managed_identity_id.to_s.empty? && !@kusto_auth.cli_auth + @logger.error('managed_identity_id is not provided, cli_auth is false and app_id/app_key is empty.') + raise LogStash::ConfigurationError.new('managed_identity_id is not provided and app_id/app_key is empty.') + end + # If proxy AAD is required and the proxy configuration is not provided - fail + if @kusto_proxy.proxy_aad_only && @kusto_proxy.is_direct_conn + @logger.error('proxy_aad_only can be used only when proxy is configured.', @kusto_proxy.proxy_aad_only) + raise LogStash::ConfigurationError.new('proxy_aad_only can be used only when proxy is configured.') + end + + if @kusto_ingest.database =~ FIELD_REF + @logger.error('database config value should not be dynamic.', @kusto_ingest.database) + raise LogStash::ConfigurationError.new('database config value should not be dynamic.') + end + if @kusto_ingest.table =~ FIELD_REF + @logger.error('table config value should not be dynamic.', @kusto_ingest.table) + raise LogStash::ConfigurationError.new('table config value should not be dynamic.') + end + if @kusto_ingest.json_mapping =~ FIELD_REF + @logger.error('json_mapping config value should not be dynamic.', @kusto_ingest.json_mapping) + raise LogStash::ConfigurationError.new('json_mapping config value should not be dynamic.') + end + if not(["https", "http"].include? @kusto_proxy.proxy_protocol) + @logger.error('proxy_protocol has to be http or https.', @kusto_proxy.proxy_protocol) + raise LogStash::ConfigurationError.new('proxy_protocol has to be http or https.') + end + + if @kusto_proxy.proxy_aad_only && @kusto_proxy.is_direct_conn + @logger.error('proxy_aad_only is true, but proxy parameters (Host,Port,Protocol) are missing.',@kusto_proxy.proxy_host,@kusto_proxy.proxy_port,@kusto_proxy.proxy_protocol) + raise LogStash::ConfigurationError.new('proxy_aad_only is true, but proxy parameters (Host,Port,Protocol) are missing.') + end + # If all validation pass then configuration is valid + return true + end #validate_config() + + end # class KustoLogstashConfiguration + class KustoAuthConfiguration + def initialize(app_id, app_key, app_tenant, managed_identity_id, cli_auth) + @app_id = app_id + @app_key = app_key + @app_tenant = app_tenant + @managed_identity_id = managed_identity_id + @cli_auth = cli_auth + @is_managed_identity = app_id.to_s.empty? && app_key.to_s.empty? && !cli_auth + @is_system_assigned_managed_identity = is_managed_identity && 0 == "system".casecmp(kusto_auth.managed_identity_id) + end + # Authentication configuration + def app_id + @app_id + end + def app_key + @app_key + end + def app_tenant + @app_tenant + end + def managed_identity_id + @managed_identity_id + end + def is_managed_identity + @is_managed_identity + end + def cli_auth + @cli_auth + end + def is_system_assigned_managed_identity + @is_system_assigned_managed_identity + end + end # class KustoAuthConfiguration + class KustoProxyConfiguration + def initialize(proxy_host , proxy_port , proxy_protocol, proxy_aad_only) + @proxy_host = proxy_host + @proxy_port = proxy_port + @proxy_protocol = proxy_protocol + @proxy_aad_only = proxy_aad_only + # Is it direct connection + @is_direct_conn = (proxy_host.nil? || proxy_host.empty?) + end + # proxy configuration + def proxy_host + @proxy_host + end + + def proxy_port + @proxy_port + end + + def proxy_protocol + @proxy_protocol + end + + def proxy_aad_only + @proxy_aad_only + end + + def is_direct_conn + @is_direct_conn + end + end # class KustoProxyConfiguration + class KustoIngestConfiguration + def initialize(ingest_url, database, table, json_mapping) + @ingest_url = ingest_url + @database = database + @table = table + @json_mapping = json_mapping + @is_mapping_ref_provided = !(json_mapping.nil? || json_mapping.empty?) + end + # For ingestion + def ingest_url + @ingest_url + end + def database + @database + end + def table + @table + end + def json_mapping + @json_mapping + end + def is_mapping_ref_provided + @is_mapping_ref_provided + end + end # class KustoIngestionConfiguration + end # module KustoInternal + end # module Outputs +end # module LogStash diff --git a/spec/outputs/kusto/ingestor_spec.rb b/spec/outputs/kusto/ingestor_spec.rb deleted file mode 100755 index a077549b..00000000 --- a/spec/outputs/kusto/ingestor_spec.rb +++ /dev/null @@ -1,97 +0,0 @@ -# encoding: utf-8 -require_relative "../../spec_helpers.rb" -require 'logstash/outputs/kusto' -require 'logstash/outputs/kusto/ingestor' - -describe LogStash::Outputs::Kusto::Ingestor do - - let(:ingest_url) { "https://ingest-sdkse2etest.eastus.kusto.windows.net/" } - let(:app_id) { "myid" } - let(:app_key) { LogStash::Util::Password.new("mykey") } - let(:app_tenant) { "mytenant" } - let(:managed_identity) { "managed_identity" } - let(:database) { "mydatabase" } - let(:cliauth) { false } - let(:table) { "mytable" } - let(:proxy_host) { "localhost" } - let(:proxy_port) { 80 } - let(:proxy_protocol) { "http" } - let(:json_mapping) { "mymapping" } - let(:logger) { spy('logger') } - - describe 'Ingestor' do - - it 'does not throw an error when initializing' do - RSpec.configuration.reporter.message("Running test: does not throw an error when initializing") - expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cliauth, database, table, json_mapping, proxy_host, proxy_port, proxy_protocol, logger) - ingestor.stop - }.not_to raise_error - RSpec.configuration.reporter.message("Completed test: does not throw an error when initializing") - end - - dynamic_name_array = ['/a%{name}/', '/a %{name}/', '/a- %{name}/', '/a- %{name}'] - - context 'doesnt allow database to have some dynamic part' do - dynamic_name_array.each do |test_database| - it "with database: #{test_database}" do - RSpec.configuration.reporter.message("Running test: doesnt allow database to have some dynamic part with database: #{test_database}") - expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cliauth, test_database, table, json_mapping, proxy_host, proxy_port, proxy_protocol, logger) - ingestor.stop - }.to raise_error(LogStash::ConfigurationError) - RSpec.configuration.reporter.message("Completed test: doesnt allow database to have some dynamic part with database: #{test_database}") - end - end - end - - context 'doesnt allow table to have some dynamic part' do - dynamic_name_array.each do |test_table| - it "with table: #{test_table}" do - RSpec.configuration.reporter.message("Running test: doesnt allow table to have some dynamic part with table: #{test_table}") - expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cliauth, database, test_table, json_mapping, proxy_host, proxy_port, proxy_protocol, logger) - ingestor.stop - }.to raise_error(LogStash::ConfigurationError) - RSpec.configuration.reporter.message("Completed test: doesnt allow table to have some dynamic part with table: #{test_table}") - end - end - end - - context 'doesnt allow mapping to have some dynamic part' do - dynamic_name_array.each do |json_mapping| - it "with mapping: #{json_mapping}" do - RSpec.configuration.reporter.message("Running test: doesnt allow mapping to have some dynamic part with mapping: #{json_mapping}") - expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cliauth, database, table, json_mapping, proxy_host, proxy_port, proxy_protocol, logger) - ingestor.stop - }.to raise_error(LogStash::ConfigurationError) - RSpec.configuration.reporter.message("Completed test: doesnt allow mapping to have some dynamic part with mapping: #{json_mapping}") - end - end - end - - context 'proxy protocol has to be http or https' do - it "with proxy protocol: socks" do - RSpec.configuration.reporter.message("Running test: proxy protocol has to be http or https with proxy protocol: socks") - expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cliauth, database, table, json_mapping, proxy_host, proxy_port, 'socks', logger) - ingestor.stop - }.to raise_error(LogStash::ConfigurationError) - RSpec.configuration.reporter.message("Completed test: proxy protocol has to be http or https with proxy protocol: socks") - end - end - - context 'one of appid or managedid has to be provided' do - it "with empty managed identity and appid" do - RSpec.configuration.reporter.message("Running test: one of appid or managedid has to be provided with empty managed identity and appid") - expect { - ingestor = described_class.new(ingest_url, "", app_key, app_tenant, "", cliauth, database, table, json_mapping, proxy_host, proxy_port,'socks',logger) - ingestor.stop - }.to raise_error(LogStash::ConfigurationError) - RSpec.configuration.reporter.message("Completed test: one of appid or managedid has to be provided with empty managed identity and appid") - end - end - end - -end diff --git a/spec/outputs/kusto/interval_spec.rb b/spec/outputs/kusto/interval_spec.rb new file mode 100644 index 00000000..ccd9cf46 --- /dev/null +++ b/spec/outputs/kusto/interval_spec.rb @@ -0,0 +1,69 @@ +# # spec/interval_test.rb +# require 'rspec' +# require 'logstash/outputs/kusto/interval' + + +# describe LogStash::Outputs::Kusto::Interval do +# let(:interval_time) { 1 } +# let(:procsy) { double("procsy", call: true) } + +# describe '#initialize' do +# it 'initializes with the correct interval and procsy' do +# interval = described_class.new(interval_time, procsy) +# expect(interval.instance_variable_get(:@interval)).to eq(interval_time) +# expect(interval.instance_variable_get(:@procsy)).to eq(procsy) +# end +# end + +# describe '#start' do +# it 'starts the interval thread' do +# interval = described_class.new(interval_time, procsy) +# interval.start +# expect(interval.alive?).to be true +# interval.stop +# end + +# it 'does not start a new thread if already started' do +# interval = described_class.new(interval_time, procsy) +# interval.start +# first_thread = interval.instance_variable_get(:@thread) +# interval.start +# second_thread = interval.instance_variable_get(:@thread) +# expect(first_thread).to eq(second_thread) +# interval.stop +# end +# end + +# describe '#stop' do +# it 'stops the interval thread' do +# interval = described_class.new(interval_time, procsy) +# interval.start +# interval.stop +# expect(interval.alive?).to be false +# end +# end + +# describe '#alive?' do +# it 'returns true if the thread is alive' do +# interval = described_class.new(interval_time, procsy) +# interval.start +# expect(interval.alive?).to be true +# interval.stop +# end + +# it 'returns false if the thread is not alive' do +# interval = described_class.new(interval_time, procsy) +# expect(interval.alive?).to be false +# end +# end + +# describe 'interval execution' do +# it 'calls the proc at the specified interval' do +# interval = described_class.new(interval_time, procsy) +# expect(procsy).to receive(:call).at_least(:twice) +# interval.start +# sleep(2.5) +# interval.stop +# end +# end +# end \ No newline at end of file diff --git a/spec/outputs/kusto/kustoLogstashConfiguration_spec.rb b/spec/outputs/kusto/kustoLogstashConfiguration_spec.rb new file mode 100755 index 00000000..cbb09ea7 --- /dev/null +++ b/spec/outputs/kusto/kustoLogstashConfiguration_spec.rb @@ -0,0 +1,106 @@ +# encoding: utf-8 +require_relative "../../spec_helpers.rb" +require 'logstash/outputs/kusto' +require 'logstash/outputs/kusto/kustoLogstashConfiguration' + +describe LogStash::Outputs::KustoInternal::KustoLogstashConfiguration do + + let(:ingest_url) { "https://ingest-sdkse2etest.eastus.kusto.windows.net/" } + let(:app_id) { "myid" } + let(:app_key) { LogStash::Util::Password.new("mykey") } + let(:app_tenant) { "mytenant" } + let(:managed_identity) { "managed_identity" } + let(:database) { "mydatabase" } + let(:cliauth) { false } + let(:table) { "mytable" } + let(:proxy_host) { "localhost" } + let(:proxy_port) { 80 } + let(:proxy_protocol) { "http" } + let(:json_mapping) { "mymapping" } + let(:delete_local) { false } + let(:logger) { spy(:logger) } + let(:proxy_aad_only) { false } + + let(:kusto_ingest_base) { LogStash::Outputs::KustoInternal::KustoIngestConfiguration.new(ingest_url, database, table, json_mapping) } + let(:kusto_auth_base) { LogStash::Outputs::KustoInternal::KustoAuthConfiguration.new(app_id, app_key, app_tenant, managed_identity, cliauth) } + let(:kusto_proxy_base) { LogStash::Outputs::KustoInternal::KustoProxyConfiguration.new(proxy_host , proxy_port , proxy_protocol, false) } + + describe '#initialize' do + it 'does not throw an error when initializing' do + # note that this will cause an internal error since connection is being tried. + # however we still want to test that all the java stuff is working as expected + expect { + kustoLogstashOutputConfiguration = described_class.new(kusto_ingest_base, kusto_auth_base , kusto_proxy_base, logger) + kustoLogstashOutputConfiguration.validate_config() + }.not_to raise_error + end + + dynamic_name_array = ['/a%{name}/', '/a %{name}/', '/a- %{name}/', '/a- %{name}'] + + context 'doesnt allow database to have some dynamic part' do + dynamic_name_array.each do |test_database| + it "with database: #{test_database}" do + expect { + kusto_ingest = LogStash::Outputs::KustoInternal::KustoIngestConfiguration.new(ingest_url, test_database, table, json_mapping) + kustoLogstashOutputConfiguration = described_class.new(kusto_ingest, kusto_auth_base , kusto_proxy_base, logger) + kustoLogstashOutputConfiguration.validate_config() + }.to raise_error(LogStash::ConfigurationError) + end + end + end + + context 'doesnt allow table to have some dynamic part' do + dynamic_name_array.each do |test_table| + it "with database: #{test_table}" do + expect { + kusto_ingest = LogStash::Outputs::KustoInternal::KustoIngestConfiguration.new(ingest_url, database, test_table, json_mapping) + kustoLogstashOutputConfiguration = described_class.new(kusto_ingest, kusto_auth_base , kusto_proxy_base, logger) + kustoLogstashOutputConfiguration.validate_config() + }.to raise_error(LogStash::ConfigurationError) + end + end + end + + context 'doesnt allow mapping to have some dynamic part' do + dynamic_name_array.each do |json_mapping| + it "with database: #{json_mapping}" do + expect { + kusto_ingest = LogStash::Outputs::KustoInternal::KustoIngestConfiguration.new(ingest_url, database, table, json_mapping) + kustoLogstashOutputConfiguration = described_class.new(kusto_ingest, kusto_auth_base , kusto_proxy_base, logger) + kustoLogstashOutputConfiguration.validate_config() + }.to raise_error(LogStash::ConfigurationError) + end + end + end + + context 'proxy protocol has to be http or https' do + it "with proxy protocol: socks" do + expect { + kusto_proxy = LogStash::Outputs::KustoInternal::KustoProxyConfiguration.new(proxy_host , proxy_port , 'socks', false) + kustoLogstashOutputConfiguration = described_class.new(kusto_ingest_base, kusto_auth_base , kusto_proxy, logger) + kustoLogstashOutputConfiguration.validate_config() + }.to raise_error(LogStash::ConfigurationError) + end + end + + context 'one of appid or managedid or cli_auth has to be provided' do + it "with empty managed identity and appid" do + expect { + kusto_auth = LogStash::Outputs::KustoInternal::KustoAuthConfiguration.new("", app_key, app_tenant, "", false) + kustoLogstashOutputConfiguration = described_class.new(kusto_ingest_base, kusto_auth , kusto_proxy_base, logger) + kustoLogstashOutputConfiguration.validate_config() + }.to raise_error(LogStash::ConfigurationError) + end + end + + context 'if proxy_aad is provided' do + it "proxy details should be provided" do + expect { + kusto_proxy = LogStash::Outputs::KustoInternal::KustoProxyConfiguration.new("" , "" , proxy_protocol, true) + kustoLogstashOutputConfiguration = described_class.new(kusto_ingest_base, kusto_auth_base , kusto_proxy, logger) + kustoLogstashOutputConfiguration.validate_config() + }.to raise_error(LogStash::ConfigurationError) + end + end + end +end \ No newline at end of file From 0be738e368a3c92620c94f2615b9efc5a51ebeb3 Mon Sep 17 00:00:00 2001 From: MonishkaDas Date: Sun, 27 Oct 2024 15:42:21 +0530 Subject: [PATCH 2/8] Added max_retries and failed_items_path() configs Updated buffer_flush and upload() --- lib/logstash/outputs/kusto.rb | 8 +- .../outputs/kusto/custom_size_based_buffer.rb | 170 +++++++----------- lib/logstash/outputs/kusto/ingestor.rb | 52 ++---- 3 files changed, 88 insertions(+), 142 deletions(-) diff --git a/lib/logstash/outputs/kusto.rb b/lib/logstash/outputs/kusto.rb index 18a1d501..246980df 100755 --- a/lib/logstash/outputs/kusto.rb +++ b/lib/logstash/outputs/kusto.rb @@ -75,11 +75,17 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base # Maximum interval (in seconds) before the buffer gets flushed, defaults to 10 config :max_interval, validate: :number, default: 10 + # Maximum number of retries before the flush fails, defaults to 3 + config :max_retries, validate: :number, default: 3 + + # Path to store failed items, defaults to nil + config :failed_items_path, validate: :string, default: nil + default :codec, 'json_lines' def register # Initialize the custom buffer with size and interval - @buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval) do |events| + @buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval, @max_retries, @failed_items_path) do |events| flush_buffer(events) end diff --git a/lib/logstash/outputs/kusto/custom_size_based_buffer.rb b/lib/logstash/outputs/kusto/custom_size_based_buffer.rb index 6a012276..646d61a5 100644 --- a/lib/logstash/outputs/kusto/custom_size_based_buffer.rb +++ b/lib/logstash/outputs/kusto/custom_size_based_buffer.rb @@ -1,82 +1,74 @@ require 'logger' require 'thread' -require 'fileutils' -require 'securerandom' +require 'csv' module LogStash module Outputs class CustomSizeBasedBuffer - def initialize(max_size_mb, max_interval, &flush_callback) + def initialize(max_size_mb = 10, max_interval = 10, max_retries = 3, failed_items_path = nil, &flush_callback) @buffer_config = { max_size: max_size_mb * 1024 * 1024, # Convert MB to bytes max_interval: max_interval, - buffer_dir: './tmp/buffer_storage/', + max_retries: max_retries, + failed_items_path: failed_items_path, logger: Logger.new(STDOUT) } @buffer_state = { pending_items: [], pending_size: 0, last_flush: Time.now.to_i, - timer: nil, - network_down: false - } - @flush_callback = flush_callback - @shutdown = false - @pending_mutex = Mutex.new - @flush_mutex = Mutex.new - load_buffer_from_files - @buffer_config[:logger].info("CustomSizeBasedBuffer initialized with max_size: #{max_size_mb} MB, max_interval: #{max_interval} seconds") - - # Start the timer thread after a delay to ensure initializations are completed - Thread.new do - sleep(10) - @buffer_state[:timer] = Thread.new do + timer: Thread.new do loop do sleep(@buffer_config[:max_interval]) buffer_flush(force: true) end end - end + } + @flush_callback = flush_callback + @pending_mutex = Mutex.new + @flush_mutex = Mutex.new + @buffer_config[:logger].info("CustomSizeBasedBuffer initialized with max_size: #{max_size_mb} MB, max_interval: #{max_interval} seconds, max_retries: #{max_retries}, failed_items_path: #{failed_items_path}") end - def <<(event) - while buffer_full? do - sleep 0.1 - end + def <<(event) + while buffer_full? do + sleep 0.1 + end @pending_mutex.synchronize do @buffer_state[:pending_items] << event @buffer_state[:pending_size] += event.bytesize end + + # Trigger a flush if the buffer size exceeds the maximum size + if buffer_full? + buffer_flush(force: true) + end end def shutdown @buffer_config[:logger].info("Shutting down buffer") - @shutdown = true @buffer_state[:timer].kill buffer_flush(final: true) - clear_buffer_files end - private + private - def buffer_full? - @pending_mutex.synchronize do - @buffer_state[:pending_size] >= @buffer_config[:max_size] - end - end - - def buffer_flush(options = {}) - force = options[:force] || options[:final] - final = options[:final] + def buffer_full? + @pending_mutex.synchronize do + @buffer_state[:pending_size] >= @buffer_config[:max_size] + end + end - if final - @flush_mutex.lock - elsif !@flush_mutex.try_lock - return 0 - end + def buffer_flush(options = {}) + force = options[:force] || options[:final] + final = options[:final] - items_flushed = 0 + if final + @flush_mutex.lock + elsif !@flush_mutex.try_lock + return 0 + end begin outgoing_items = [] @@ -84,96 +76,68 @@ def buffer_flush(options = {}) @pending_mutex.synchronize do return 0 if @buffer_state[:pending_size] == 0 - - time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush] + time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush] if !force && @buffer_state[:pending_size] < @buffer_config[:max_size] && time_since_last_flush < @buffer_config[:max_interval] return 0 end if force - @buffer_config[:logger].info("Time-based flush triggered after #{@buffer_config[:max_interval]} seconds") - elsif @buffer_state[:pending_size] >= @buffer_config[:max_size] - @buffer_config[:logger].info("Size-based flush triggered at #{@buffer_state[:pending_size]} bytes was reached") - else - @buffer_config[:logger].info("Flush triggered without specific condition") + if time_since_last_flush >= @buffer_config[:max_interval] + @buffer_config[:logger].info("Time-based flush triggered after #{@buffer_config[:max_interval]} seconds") + else + @buffer_config[:logger].info("Size-based flush triggered when #{@buffer_state[:pending_size]} bytes was reached") + end end outgoing_items = @buffer_state[:pending_items].dup outgoing_size = @buffer_state[:pending_size] - @buffer_state[:pending_items] = [] - @buffer_state[:pending_size] = 0 + buffer_initialize end + retries = 0 begin + @buffer_config[:logger].info("Flushing: #{outgoing_items.size} items and #{outgoing_size} bytes to the network") @flush_callback.call(outgoing_items) # Pass the list of events to the callback - @buffer_state[:network_down] = false # Reset network status after successful flush - flush_buffer_files # Flush buffer files if any exist + @buffer_state[:last_flush] = Time.now.to_i + @buffer_config[:logger].info("Flush completed. Flushed #{outgoing_items.size} events, #{outgoing_size} bytes") rescue => e - @buffer_config[:logger].error("Flush failed: #{e.message}") - @buffer_state[:network_down] = true - save_buffer_to_file(outgoing_items) + retries += 1 + if retries <= @buffer_config[:max_retries] + @buffer_config[:logger].error("Flush failed: #{e.message}. \nRetrying (#{retries}/#{@buffer_config[:max_retries]})...") + sleep 1 + retry + else + @buffer_config[:logger].error("Max retries reached. Failed to flush #{outgoing_items.size} items and #{outgoing_size} bytes") + handle_failed_flush(outgoing_items, e.message) + end end - @buffer_state[:last_flush] = Time.now.to_i - @buffer_config[:logger].info("Flush completed. Flushed #{outgoing_items.size} events, #{outgoing_size} bytes") - - items_flushed = outgoing_items.size ensure @flush_mutex.unlock end - - items_flushed - end - - def save_buffer_to_file(events) - buffer_state_copy = { - pending_items: events, - pending_size: events.sum(&:bytesize) - } - - ::FileUtils.mkdir_p(@buffer_config[:buffer_dir]) # Ensure directory exists - file_path = ::File.join(@buffer_config[:buffer_dir], "buffer_state_#{Time.now.to_i}_#{SecureRandom.uuid}.log") - ::File.open(file_path, 'w') do |file| - file.write(Marshal.dump(buffer_state_copy)) - end - @buffer_config[:logger].info("Saved buffer state to file: #{file_path}") end - def load_buffer_from_files - Dir.glob(::File.join(@buffer_config[:buffer_dir], 'buffer_state_*.log')).each do |file_path| + def handle_failed_flush(items, error_message) + if @buffer_config[:failed_items_path] begin - buffer_state = Marshal.load(::File.read(file_path)) - @buffer_state[:pending_items].concat(buffer_state[:pending_items]) - @buffer_state[:pending_size] += buffer_state[:pending_size] - ::File.delete(file_path) - rescue => e - @buffer_config[:logger].error("Failed to load buffer from file: #{e.message}") - end - end - @buffer_config[:logger].info("Loaded buffer state from files") - end - - def flush_buffer_files - Dir.glob(::File.join(@buffer_config[:buffer_dir], 'buffer_state_*.log')).each do |file_path| - begin - buffer_state = Marshal.load(::File.read(file_path)) - @buffer_config[:logger].info("Flushed from file: #{file_path}") - @flush_callback.call(buffer_state[:pending_items]) - ::File.delete(file_path) - @buffer_config[:logger].info("Flushed and deleted buffer state file: #{file_path}") + ::File.open(@buffer_config[:failed_items_path], 'a') do |file| + items.each do |item| + file.puts(item) + end + end + @buffer_config[:logger].info("Failed items stored in #{@buffer_config[:failed_items_path]}") rescue => e - @buffer_config[:logger].error("Failed to flush buffer state file: #{e.message}") - break + @buffer_config[:logger].error("Failed to store items: #{e.message}") end + else + @buffer_config[:logger].warn("No failed_items_path configured. Data loss may occur.") end end - def clear_buffer_files - Dir.glob(::File.join(@buffer_config[:buffer_dir], 'buffer_state_*.log')).each do |file_path| - ::File.delete(file_path) - end - @buffer_config[:logger].info("Cleared all buffer state files") + def buffer_initialize + @buffer_state[:pending_items] = [] + @buffer_state[:pending_size] = 0 end end end diff --git a/lib/logstash/outputs/kusto/ingestor.rb b/lib/logstash/outputs/kusto/ingestor.rb index 392cb0a1..f06a282a 100755 --- a/lib/logstash/outputs/kusto/ingestor.rb +++ b/lib/logstash/outputs/kusto/ingestor.rb @@ -75,7 +75,8 @@ def initialize(kusto_logstash_configuration, logger, threadpool = DEFAULT_THREAD end @ingestion_properties = kusto_java.ingest.IngestionProperties.new(@kusto_logstash_configuration.kusto_ingest.database, @kusto_logstash_configuration.kusto_ingest.table) - + @ingestion_properties.setReportLevel(Java::ComMicrosoftAzureKustoIngest::IngestionProperties::IngestionReportLevel::FAILURES_AND_SUCCESSES) + @ingestion_properties.setReportMethod(Java::ComMicrosoftAzureKustoIngest::IngestionProperties::IngestionReportMethod::TABLE) if @kusto_logstash_configuration.kusto_ingest.is_mapping_ref_provided @logger.debug('Using mapping reference.', @kusto_logstash_configuration.kusto_ingest.json_mapping) @ingestion_properties.setIngestionMapping(@kusto_logstash_configuration.kusto_ingest.json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON) @@ -101,59 +102,34 @@ def upload_async(data) exception = e end end - # Wait for the task to complete and check for exceptions @workers_pool.shutdown @workers_pool.wait_for_termination - - if exception - @logger.error('StandardError in upload_async.', exception: exception.class, message: exception.message, backtrace: exception.backtrace) - raise exception - end + + raise exception if exception rescue Exception => e @logger.error('StandardError in upload_async.', exception: e.class, message: e.message, backtrace: e.backtrace) raise e end def upload(data) - @logger.info("Sending data to Kusto") - + @logger.debug("Sending data to Kusto") if data.size > 0 - ingestionLatch = java.util.concurrent.CountDownLatch.new(1) - - Thread.new do - begin - data_source_info = Java::com.microsoft.azure.kusto.ingest.source.StreamSourceInfo.new(java.io.ByteArrayInputStream.new(data.to_java_bytes)) - ingestion_result = @kusto_client.ingestFromStream(data_source_info, @ingestion_properties) - - # Check the ingestion status - status = ingestion_result.getIngestionStatusCollection.get(0) - if status.status != Java::com.microsoft.azure.kusto.ingest.result.OperationStatus::Queued - raise "Failed upload: #{status.errorCodeString}" - end - @logger.info("Final ingestion status: #{status.status}") - rescue => e - @logger.error('Error during ingestFromStream.', exception: e.class, message: e.message, backtrace: e.backtrace) - if e.message.include?("network") - raise e - end - ensure - ingestionLatch.countDown() - end - end - - # Wait for the ingestion to complete with a timeout - if !ingestionLatch.await(30, java.util.concurrent.TimeUnit::SECONDS) - @logger.error('Ingestion timed out, possible network issue.') - raise 'Ingestion timed out, possible network issue.' + begin + data_source_info = Java::com.microsoft.azure.kusto.ingest.source.StreamSourceInfo.new(java.io.ByteArrayInputStream.new(data.to_java_bytes)) + result = @kusto_client.ingestFromStream(data_source_info, @ingestion_properties) + rescue => e + @logger.error('Uploading failed.', exception: e.class, message: e.message, backtrace: e.backtrace) + raise e end else @logger.warn("Data is empty and is not ingested.") end - @logger.info("Data sent to Kusto.") + + @logger.debug("Data sent to Kusto.") rescue => e @logger.error('Uploading failed.', exception: e.class, message: e.message, backtrace: e.backtrace) - raise e # Raise the original error if ingestion fails + raise e end def stop From 7da8798da4064cbea9ab66fa69c9a8e2ca5baa78 Mon Sep 17 00:00:00 2001 From: MonishkaDas Date: Tue, 5 Nov 2024 15:59:55 +0530 Subject: [PATCH 3/8] Updated README.md and configs Updated failed_items_path to be a required config parameter. If set to "nil", the failed items will not be persisted to local storage, else the items are stored in the file path provided after max_retries. --- README.md | 68 ++++++++++--------- lib/logstash/outputs/kusto.rb | 12 ++-- .../outputs/kusto/custom_size_based_buffer.rb | 10 +-- 3 files changed, 48 insertions(+), 42 deletions(-) diff --git a/README.md b/README.md index b256a266..bd113fbc 100755 --- a/README.md +++ b/README.md @@ -35,19 +35,22 @@ Perform configuration before sending events from Logstash to Azure Data Explorer ```ruby output { - kusto { - path => "/tmp/kusto/%{+YYYY-MM-dd-HH-mm}.txt" - ingest_url => "https://ingest-.kusto.windows.net/" - app_id => "" - app_key => "" - app_tenant => "" - database => "" - table => "" - json_mapping => "" - proxy_host => "" - proxy_port => - proxy_protocol => <"http"|"https"> - } + kusto { + ingest_url => "https://ingest-.kusto.windows.net/" + app_id => "" + app_key => "" + app_tenant => "" + database => "" + table => "" + json_mapping => "" + proxy_host => "" + proxy_port => + proxy_protocol => <"http"|"https"> + max_size => 10 + max_interval => 10 + max_retries => 3 + failed_items_path => "" + } } ``` More information about configuring Logstash can be found in the [logstash configuration guide](https://www.elastic.co/guide/en/logstash/current/configuration.html) @@ -56,22 +59,24 @@ More information about configuring Logstash can be found in the [logstash config | Parameter Name | Description | Notes | | --- | --- | --- | -| **path** | The plugin writes events to temporary files before sending them to ADX. This parameter includes a path where files should be written and a time expression for file rotation to trigger an upload to the ADX service. The example above shows how to rotate the files every minute and check the Logstash docs for more information on time expressions. | Required -| **ingest_url** | The Kusto endpoint for ingestion-related communication. See it on the Azure Portal.| Required| -| **app_id, app_key, app_tenant**| Credentials required to connect to the ADX service. Be sure to use an application with 'ingest' privileges. | Optional| -| **managed_identity**| Managed Identity to authenticate. For user-based managed ID, use the Client ID GUID. For system-based, use the value `system`. The ID needs to have 'ingest' privileges on the cluster. | Optional| -| **database**| Database name to place events | Required | -| **table** | Target table name to place events | Required +| **ingest_url** | The Kusto endpoint for ingestion-related communication. See it on the Azure Portal. | Required | +| **app_id, app_key, app_tenant** | Credentials required to connect to the ADX service. Be sure to use an application with 'ingest' privileges. | Optional | +| **managed_identity** | Managed Identity to authenticate. For user-based managed ID, use the Client ID GUID. For system-based, use the value `system`. The ID needs to have 'ingest' privileges on the cluster. | Optional | +| **database** | Database name to place events | Required | +| **table** | Target table name to place events | Required | +| **failed_items_path** | Path to store failed items when max_retries is reached. Set to nil to disable persistence to file (May cause data loss). | Required | | **json_mapping** | Maps each attribute from incoming event JSON strings to the appropriate column in the table. Note that this must be in JSON format, as this is the interface between Logstash and Kusto | Optional | -| **recovery** | If set to true (default), plugin will attempt to resend pre-existing temp files found in the path upon startup | | -| **delete_temp_files** | Determines if temp files will be deleted after a successful upload (true is default; set false for debug purposes only)| | -| **flush_interval** | The time (in seconds) for flushing writes to temporary files. Default is 2 seconds, 0 will flush on every event. Increase this value to reduce IO calls but keep in mind that events in the buffer will be lost in case of abrupt failure.| | -| **proxy_host** | The proxy hostname for redirecting traffic to Kusto.| | -| **proxy_port** | The proxy port for the proxy. Defaults to 80.| | -| **proxy_protocol** | The proxy server protocol , is one of http or https.| | +| **proxy_host** | The proxy hostname for redirecting traffic to Kusto. | Optional | +| **proxy_port** | The proxy port for the proxy. Defaults to 80. | Optional | +| **proxy_protocol** | The proxy server protocol, is one of http or https. | Optional | +| **max_size** | Maximum size of the buffer before it gets flushed, defaults to 10MB. | Optional | +| **max_interval** | Maximum interval (in seconds) before the buffer gets flushed, defaults to 10. | Optional | +| **max_retries** | Maximum number of retries before the flush fails. Defaults to 3. | Optional | > Note : LS_JAVA_OPTS can be used to set proxy parameters as well (using export or SET options) +> Note: **path** config parameter is no longer used in the latest release (3.0.0) and will be deprecated in future releases + ```bash export LS_JAVA_OPTS="-Dhttp.proxyHost=1.2.34 -Dhttp.proxyPort=8989 -Dhttps.proxyHost=1.2.3.4 -Dhttps.proxyPort=8989" ``` @@ -81,12 +86,13 @@ export LS_JAVA_OPTS="-Dhttp.proxyHost=1.2.34 -Dhttp.proxyPort=8989 -Dhttps.prox | Version | Release Date | Notes | | --- | --- | --- | -| 2.0.8 | 2024-10-23 | - Fix library deprecations, fix issues in the Azure Identity library | -| 2.0.7 | 2024-01-01 | - Update Kusto JAVA SDK | -| 2.0.3 | 2023-12-12 | - Make JSON mapping field optional. If not provided logstash output JSON attribute names will be used for column resolution | -| 2.0.2 | 2023-11-28 | - Bugfix for the scenario where the plugin uses managed identity. Instead of providing the managed identity name as empty in the config,it can completely be skipped | -| 2.0.0 | 2023-09-19 | - Upgrade to the latest Java SDK version [5.0.2](https://github.com/Azure/azure-kusto-java/releases/tag/v5.0.2). Tests have been performed on **__Logstash 8.5__** and up (Does not work with 6.x or 7.x versions of Logstash - For these versions use 1.x.x versions of logstash-output-kusto gem) - Fixes CVE's in common-text & outdated Jackson libraries | -| 1.0.6 | 2022-11-29 | - Upgrade to the latest Java SDK [3.2.1](https://github.com/Azure/azure-kusto-java/releases/tag/v3.2.1) version. Tests have been performed on Logstash 6.x and up.| +| 3.0.0 | 2024-11-01 | Updated configuration options | +| 2.0.8 | 2024-10-23 | Fix library deprecations, fix issues in the Azure Identity library | +| 2.0.7 | 2024-01-01 | Update Kusto JAVA SDK | +| 2.0.3 | 2023-12-12 | Make JSON mapping field optional. If not provided logstash output JSON attribute names will be used for column resolution | +| 2.0.2 | 2023-11-28 | Bugfix for the scenario where the plugin uses managed identity. Instead of providing the managed identity name as empty in the config,it can completely be skipped | +| 2.0.0 | 2023-09-19 | Upgrade to the latest Java SDK version [5.0.2](https://github.com/Azure/azure-kusto-java/releases/tag/v5.0.2). Tests have been performed on **__Logstash 8.5__** and up (Does not work with 6.x or 7.x versions of Logstash - For these versions use 1.x.x versions of logstash-output-kusto gem) - Fixes CVE's in common-text & outdated Jackson libraries | +| 1.0.6 | 2022-11-29 | Upgrade to the latest Java SDK [3.2.1](https://github.com/Azure/azure-kusto-java/releases/tag/v3.2.1) version. Tests have been performed on Logstash 6.x and up.| ## Development Requirements diff --git a/lib/logstash/outputs/kusto.rb b/lib/logstash/outputs/kusto.rb index 246980df..b53b6485 100755 --- a/lib/logstash/outputs/kusto.rb +++ b/lib/logstash/outputs/kusto.rb @@ -39,6 +39,9 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base config :database, validate: :string, required: true # Target table name config :table, validate: :string, required: true + # Path to store failed items when max_retries is reached, set to "nil" to disable persistence to file + config :failed_items_path, validate: :string, required: true + # Mapping name - Used by Kusto to map each attribute from incoming event JSON strings to the appropriate column in the table. # Note that this must be in JSON format, as this is the interface between Logstash and Kusto # Make this optional as name resolution in the JSON mapping can be done based on attribute names in the incoming event JSON strings @@ -70,16 +73,13 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base config :proxy_protocol, validate: :string, required: false , default: 'http' # Maximum size of the buffer before it gets flushed, defaults to 10MB - config :max_size, validate: :number, default: 10 + config :max_size, validate: :number, required: false , default: 10 # Maximum interval (in seconds) before the buffer gets flushed, defaults to 10 - config :max_interval, validate: :number, default: 10 + config :max_interval, validate: :number, required: false , default: 10 # Maximum number of retries before the flush fails, defaults to 3 - config :max_retries, validate: :number, default: 3 - - # Path to store failed items, defaults to nil - config :failed_items_path, validate: :string, default: nil + config :max_retries, validate: :number, required: false , default: 3 default :codec, 'json_lines' diff --git a/lib/logstash/outputs/kusto/custom_size_based_buffer.rb b/lib/logstash/outputs/kusto/custom_size_based_buffer.rb index 646d61a5..e04478a4 100644 --- a/lib/logstash/outputs/kusto/custom_size_based_buffer.rb +++ b/lib/logstash/outputs/kusto/custom_size_based_buffer.rb @@ -109,7 +109,7 @@ def buffer_flush(options = {}) retry else @buffer_config[:logger].error("Max retries reached. Failed to flush #{outgoing_items.size} items and #{outgoing_size} bytes") - handle_failed_flush(outgoing_items, e.message) + handle_failed_flush(outgoing_items) end end @@ -118,8 +118,10 @@ def buffer_flush(options = {}) end end - def handle_failed_flush(items, error_message) - if @buffer_config[:failed_items_path] + def handle_failed_flush(items) + if @buffer_config[:failed_items_path].nil? || @buffer_config[:failed_items_path] == "nil" + @buffer_config[:logger].warn("No failed_items_path configured. The failed items are not persisted. Data loss may occur.") + else begin ::File.open(@buffer_config[:failed_items_path], 'a') do |file| items.each do |item| @@ -130,8 +132,6 @@ def handle_failed_flush(items, error_message) rescue => e @buffer_config[:logger].error("Failed to store items: #{e.message}") end - else - @buffer_config[:logger].warn("No failed_items_path configured. Data loss may occur.") end end From b624d014920930bf079f86ef1e8bb5dec4ec9c6a Mon Sep 17 00:00:00 2001 From: Ramachandran G Date: Mon, 11 Nov 2024 18:57:42 +0530 Subject: [PATCH 4/8] *Make some fixes for retries and upload --- lib/logstash/outputs/kusto.rb | 1 - lib/logstash/outputs/kusto/ingestor.rb | 61 +++++++++++++------------- 2 files changed, 30 insertions(+), 32 deletions(-) diff --git a/lib/logstash/outputs/kusto.rb b/lib/logstash/outputs/kusto.rb index b53b6485..893dba75 100755 --- a/lib/logstash/outputs/kusto.rb +++ b/lib/logstash/outputs/kusto.rb @@ -136,7 +136,6 @@ def close @logger.error("Error stopping ingestor: #{e.message}") @logger.error(e.backtrace.join("\n")) end - @logger.info("Kusto output plugin Closed") end diff --git a/lib/logstash/outputs/kusto/ingestor.rb b/lib/logstash/outputs/kusto/ingestor.rb index f06a282a..0f9a6c21 100755 --- a/lib/logstash/outputs/kusto/ingestor.rb +++ b/lib/logstash/outputs/kusto/ingestor.rb @@ -21,6 +21,8 @@ class Ingestor FIELD_REF = /%\{[^}]+\}/ def initialize(kusto_logstash_configuration, logger, threadpool = DEFAULT_THREADPOOL) + @retry_count = 3 + @retry_delay = 10 @workers_pool = threadpool @logger = logger #Validate and assign @@ -85,6 +87,15 @@ def initialize(kusto_logstash_configuration, logger, threadpool = DEFAULT_THREAD @logger.debug('No mapping reference provided. Columns will be mapped by names in the logstash output') @ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON) end + # retry_policy = Java::com.azure.storage.common.policy + # duration = Java::java.time.Duration.ofSeconds(5) + + # fixed_delay_options = Java::com.azure.core.http.policy.FixedDelayOptions.new(1,duration) + # retry_options = Java::com.azure.core.http.policy.RetryOptions.new(fixed_delay_options) + # req_retry_options = Java::com.azure.storage.common.policy.RequestRetryOptions.fromRetryOptions(retry_options, Java::java.time.Duration.ofSeconds(10), "") + + # queued_ingest_client = @kusto_client.to_java(Java::com.microsoft.azure.kusto.ingest.QueuedIngestClientImpl) + # queued_ingest_client.setQueueRequestOptions(req_retry_options) @logger.debug('Kusto resources are ready.') end @@ -92,44 +103,32 @@ def upload_async(data) if @workers_pool.remaining_capacity <= LOW_QUEUE_LENGTH @logger.warn("Ingestor queue capacity is running low with #{@workers_pool.remaining_capacity} free slots.") end - exception = nil @workers_pool.post do - LogStash::Util.set_thread_name("Kusto to ingest data") - begin - upload(data) - rescue => e - @logger.error('Error during async upload.', exception: e.class, message: e.message, backtrace: e.backtrace) - exception = e - end + LogStash::Util.set_thread_name("Kusto to ingest data #{JRuby.reference(Thread.current).native_thread.id}") + upload(data) end - # Wait for the task to complete and check for exceptions - @workers_pool.shutdown - @workers_pool.wait_for_termination - - raise exception if exception - rescue Exception => e - @logger.error('StandardError in upload_async.', exception: e.class, message: e.message, backtrace: e.backtrace) - raise e end def upload(data) - @logger.debug("Sending data to Kusto") - if data.size > 0 - begin - data_source_info = Java::com.microsoft.azure.kusto.ingest.source.StreamSourceInfo.new(java.io.ByteArrayInputStream.new(data.to_java_bytes)) - result = @kusto_client.ingestFromStream(data_source_info, @ingestion_properties) - rescue => e - @logger.error('Uploading failed.', exception: e.class, message: e.message, backtrace: e.backtrace) - raise e + begin + @logger.debug("Sending data to Kusto") + if data.size > 0 + data_source_info = Java::com.microsoft.azure.kusto.ingest.source.StreamSourceInfo.new(java.io.ByteArrayInputStream.new(data.to_java_bytes)) + result = @kusto_client.ingestFromStream(data_source_info, @ingestion_properties) + else + @logger.warn("Data is empty and is not ingested.") + end + @logger.debug("Data sent to Kusto.") + rescue => e + if tries < @retry_count + tries += 1 + logger.warn("Uploading failed, retrying (##{tries} of #{@retry_count})", :exception => e.class, :message => e.message, :path => file.path, :backtrace => e.backtrace) + sleep @retry_delay + retry + else + logger.error("Failed to upload file (retried #{@retry_count} times).", :exception => e.class, :message => e.message, :path => file.path, :backtrace => e.backtrace) end - else - @logger.warn("Data is empty and is not ingested.") end - - @logger.debug("Data sent to Kusto.") - rescue => e - @logger.error('Uploading failed.', exception: e.class, message: e.message, backtrace: e.backtrace) - raise e end def stop From 92edb8f74fb755fed3d6d38acd91c3cb0e7a910c Mon Sep 17 00:00:00 2001 From: MonishkaDas Date: Thu, 12 Dec 2024 17:56:09 +0530 Subject: [PATCH 5/8] Added Latch Timeout and updated java sdk version --- build.gradle | 6 +- lib/logstash/outputs/kusto.rb | 28 +-- .../outputs/kusto/custom_size_based_buffer.rb | 199 ++++++++++++------ lib/logstash/outputs/kusto/ingestor.rb | 81 ++++--- 4 files changed, 197 insertions(+), 117 deletions(-) diff --git a/build.gradle b/build.gradle index a56b7a35..e92dcb9c 100644 --- a/build.gradle +++ b/build.gradle @@ -29,8 +29,8 @@ repositories { // update dependencies to bom azure-sdk-bom/1.2.24 dependencies { - implementation 'com.microsoft.azure.kusto:kusto-data:5.2.0' - implementation 'com.microsoft.azure.kusto:kusto-ingest:5.2.0' + implementation 'com.microsoft.azure.kusto:kusto-data:6.0.0' + implementation 'com.microsoft.azure.kusto:kusto-ingest:6.0.0' implementation 'com.azure:azure-core-http-netty:1.15.1' implementation 'com.azure:azure-core:1.49.1' implementation 'com.azure:azure-data-tables:12.4.2' @@ -52,7 +52,7 @@ dependencies { implementation 'com.nimbusds:nimbus-jose-jwt:9.40' implementation 'com.nimbusds:oauth2-oidc-sdk:11.13' implementation 'com.univocity:univocity-parsers:2.9.1' - implementation 'commons-codec:commons-codec:1.16.1' + implementation 'commons-codec:commons-codec:1.17.1' implementation 'commons-logging:commons-logging:1.3.1' implementation 'io.github.resilience4j:resilience4j-core:1.7.1' implementation 'io.github.resilience4j:resilience4j-retry:1.7.1' diff --git a/lib/logstash/outputs/kusto.rb b/lib/logstash/outputs/kusto.rb index 893dba75..1011e958 100755 --- a/lib/logstash/outputs/kusto.rb +++ b/lib/logstash/outputs/kusto.rb @@ -39,9 +39,6 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base config :database, validate: :string, required: true # Target table name config :table, validate: :string, required: true - # Path to store failed items when max_retries is reached, set to "nil" to disable persistence to file - config :failed_items_path, validate: :string, required: true - # Mapping name - Used by Kusto to map each attribute from incoming event JSON strings to the appropriate column in the table. # Note that this must be in JSON format, as this is the interface between Logstash and Kusto # Make this optional as name resolution in the JSON mapping can be done based on attribute names in the incoming event JSON strings @@ -78,17 +75,13 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base # Maximum interval (in seconds) before the buffer gets flushed, defaults to 10 config :max_interval, validate: :number, required: false , default: 10 - # Maximum number of retries before the flush fails, defaults to 3 - config :max_retries, validate: :number, required: false , default: 3 + # Latch timeout in seconds, defaults to 60 + config :latch_timeout, validate: :number, required: false, default: 60 + default :codec, 'json_lines' def register - # Initialize the custom buffer with size and interval - @buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval, @max_retries, @failed_items_path) do |events| - flush_buffer(events) - end - @io_mutex = Mutex.new final_mapping = json_mapping @@ -98,13 +91,22 @@ def register max_threads: upload_concurrent_count, max_queue: upload_queue_size, fallback_policy: :caller_runs) - + kusto_ingest_base = LogStash::Outputs::KustoInternal::KustoIngestConfiguration.new(ingest_url, database, table, final_mapping) kusto_auth_base = LogStash::Outputs::KustoInternal::KustoAuthConfiguration.new(app_id, app_key, app_tenant, managed_identity, cli_auth) kusto_proxy_base = LogStash::Outputs::KustoInternal::KustoProxyConfiguration.new(proxy_host , proxy_port , proxy_protocol, false) @kusto_logstash_configuration = LogStash::Outputs::KustoInternal::KustoLogstashConfiguration.new(kusto_ingest_base, kusto_auth_base , kusto_proxy_base, logger) - @ingestor = Ingestor.new(@kusto_logstash_configuration, @logger, executor) + @ingestor = Ingestor.new(@kusto_logstash_configuration, @logger, latch_timeout, executor) + # Deprecation warning for path + if @path + @logger.warn("The 'path' configuration option is deprecated and will be removed in a future release.") + end + sleep(30) + # Initialize the custom buffer with size and interval + @buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval) do |events| + flush_buffer(events) + end end @@ -121,7 +123,7 @@ def multi_receive_encoded(events_and_encoded) def close @logger.info("Closing Kusto output plugin") - begin + begin @buffer.shutdown unless @buffer.nil? @logger.info("Buffer shutdown") unless @buffer.nil? rescue => e diff --git a/lib/logstash/outputs/kusto/custom_size_based_buffer.rb b/lib/logstash/outputs/kusto/custom_size_based_buffer.rb index e04478a4..ce4b8a6d 100644 --- a/lib/logstash/outputs/kusto/custom_size_based_buffer.rb +++ b/lib/logstash/outputs/kusto/custom_size_based_buffer.rb @@ -1,37 +1,46 @@ require 'logger' require 'thread' -require 'csv' +require 'fileutils' +require 'securerandom' +require 'net/http' +require 'uri' module LogStash module Outputs class CustomSizeBasedBuffer - def initialize(max_size_mb = 10, max_interval = 10, max_retries = 3, failed_items_path = nil, &flush_callback) + def initialize(max_size_mb, max_interval, &flush_callback) @buffer_config = { max_size: max_size_mb * 1024 * 1024, # Convert MB to bytes max_interval: max_interval, - max_retries: max_retries, - failed_items_path: failed_items_path, + buffer_dir: './tmp/buffer_storage/', logger: Logger.new(STDOUT) } @buffer_state = { pending_items: [], pending_size: 0, last_flush: Time.now.to_i, - timer: Thread.new do - loop do - sleep(@buffer_config[:max_interval]) - buffer_flush(force: true) - end - end + timer: nil, + network_down: false } @flush_callback = flush_callback + @shutdown = false @pending_mutex = Mutex.new @flush_mutex = Mutex.new - @buffer_config[:logger].info("CustomSizeBasedBuffer initialized with max_size: #{max_size_mb} MB, max_interval: #{max_interval} seconds, max_retries: #{max_retries}, failed_items_path: #{failed_items_path}") + load_buffer_from_files + @buffer_config[:logger].info("CustomSizeBasedBuffer initialized with max_size: #{max_size_mb} MB, max_interval: #{max_interval} seconds") + + # Start the timer thread + @buffer_state[:timer] = Thread.new do + loop do + sleep(@buffer_config[:max_interval]) + prepare_flush(force: true) + end + end end def <<(event) while buffer_full? do + prepare_flush(force: true) # Flush when buffer is full sleep 0.1 end @@ -39,17 +48,14 @@ def <<(event) @buffer_state[:pending_items] << event @buffer_state[:pending_size] += event.bytesize end - - # Trigger a flush if the buffer size exceeds the maximum size - if buffer_full? - buffer_flush(force: true) - end end def shutdown @buffer_config[:logger].info("Shutting down buffer") + @shutdown = true @buffer_state[:timer].kill - buffer_flush(final: true) + prepare_flush(final: true) + flush_buffer_files end private @@ -60,56 +66,85 @@ def buffer_full? end end - def buffer_flush(options = {}) + def prepare_flush(options = {}) force = options[:force] || options[:final] final = options[:final] - if final - @flush_mutex.lock - elsif !@flush_mutex.try_lock - return 0 + outgoing_items = [] + outgoing_size = 0 + + @pending_mutex.synchronize do + return 0 if @buffer_state[:pending_size] == 0 + time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush] + + if !force && @buffer_state[:pending_size] < @buffer_config[:max_size] && time_since_last_flush < @buffer_config[:max_interval] + return 0 + end + + if time_since_last_flush >= @buffer_config[:max_interval] + @buffer_config[:logger].info("Time-based flush triggered after #{@buffer_config[:max_interval]} seconds") + else + @buffer_config[:logger].info("Size-based flush triggered at #{@buffer_state[:pending_size]} bytes was reached") + end + + if @buffer_state[:network_down] + save_buffer_to_file(@buffer_state[:pending_items]) + @buffer_state[:pending_items] = [] + @buffer_state[:pending_size] = 0 + return 0 + end + + outgoing_items = @buffer_state[:pending_items].dup + outgoing_size = @buffer_state[:pending_size] + @buffer_state[:pending_items] = [] + @buffer_state[:pending_size] = 0 end - begin - outgoing_items = [] - outgoing_size = 0 + if Dir.glob(::File.join(@buffer_config[:buffer_dir], 'buffer_state_*.log')).any? + @buffer_config[:logger].info("Flushing all buffer state files") + flush_buffer_files + end - @pending_mutex.synchronize do - return 0 if @buffer_state[:pending_size] == 0 - time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush] + Thread.new { perform_flush(outgoing_items) } + end - if !force && @buffer_state[:pending_size] < @buffer_config[:max_size] && time_since_last_flush < @buffer_config[:max_interval] + def perform_flush(events, file_path = nil) + + @flush_mutex.lock + + begin + if file_path + unless ::File.exist?(file_path) return 0 end - - if force - if time_since_last_flush >= @buffer_config[:max_interval] - @buffer_config[:logger].info("Time-based flush triggered after #{@buffer_config[:max_interval]} seconds") - else - @buffer_config[:logger].info("Size-based flush triggered when #{@buffer_state[:pending_size]} bytes was reached") - end + begin + buffer_state = Marshal.load(::File.read(file_path)) + events = buffer_state[:pending_items] + rescue => e + @buffer_config[:logger].error("Failed to load buffer from file: #{e.message}") + return 0 end - - outgoing_items = @buffer_state[:pending_items].dup - outgoing_size = @buffer_state[:pending_size] - buffer_initialize end - - retries = 0 - begin - @buffer_config[:logger].info("Flushing: #{outgoing_items.size} items and #{outgoing_size} bytes to the network") - @flush_callback.call(outgoing_items) # Pass the list of events to the callback - @buffer_state[:last_flush] = Time.now.to_i - @buffer_config[:logger].info("Flush completed. Flushed #{outgoing_items.size} events, #{outgoing_size} bytes") - rescue => e - retries += 1 - if retries <= @buffer_config[:max_retries] - @buffer_config[:logger].error("Flush failed: #{e.message}. \nRetrying (#{retries}/#{@buffer_config[:max_retries]})...") - sleep 1 + @buffer_config[:logger].info("Flushing #{events.size} events, #{events.sum(&:bytesize)} bytes") + @flush_callback.call(events) # Pass the list of events to the callback + @buffer_state[:network_down] = false # Reset network status after successful flush + @buffer_state[:last_flush] = Time.now.to_i + @buffer_config[:logger].info("Flush completed. Flushed #{events.size} events, #{events.sum(&:bytesize)} bytes") + + if file_path + ::File.delete(file_path) + @buffer_config[:logger].info("Flushed and deleted buffer state file: #{file_path}") + end + + rescue => e + @buffer_config[:logger].error("Flush failed: #{e.message}") + @buffer_state[:network_down] = true + + while true + sleep(2) # Wait before checking network availability again + if network_available? + @buffer_config[:logger].info("Network is back up. Retrying flush.") retry - else - @buffer_config[:logger].error("Max retries reached. Failed to flush #{outgoing_items.size} items and #{outgoing_size} bytes") - handle_failed_flush(outgoing_items) end end @@ -117,27 +152,53 @@ def buffer_flush(options = {}) @flush_mutex.unlock end end + + def network_available? + begin + uri = URI('http://www.google.com') + response = Net::HTTP.get_response(uri) + response.is_a?(Net::HTTPSuccess) + rescue + false + end + end - def handle_failed_flush(items) - if @buffer_config[:failed_items_path].nil? || @buffer_config[:failed_items_path] == "nil" - @buffer_config[:logger].warn("No failed_items_path configured. The failed items are not persisted. Data loss may occur.") - else + def save_buffer_to_file(events) + buffer_state_copy = { + pending_items: events, + pending_size: events.sum(&:bytesize) + } + begin + ::FileUtils.mkdir_p(@buffer_config[:buffer_dir]) # Ensure directory exists + file_path = ::File.join(@buffer_config[:buffer_dir], "buffer_state_#{Time.now.to_i}_#{SecureRandom.uuid}.log") + ::File.open(file_path, 'w') do |file| + file.write(Marshal.dump(buffer_state_copy)) + end + @buffer_config[:logger].info("Saved #{events.size} events to file: #{file_path}") + rescue => e + @buffer_config[:logger].error("Failed to save buffer to file: #{e.message}") + end + end + + def load_buffer_from_files + Dir.glob(::File.join(@buffer_config[:buffer_dir], 'buffer_state_*.log')).each do |file_path| begin - ::File.open(@buffer_config[:failed_items_path], 'a') do |file| - items.each do |item| - file.puts(item) - end - end - @buffer_config[:logger].info("Failed items stored in #{@buffer_config[:failed_items_path]}") + buffer_state = Marshal.load(::File.read(file_path)) + @buffer_state[:pending_items].concat(buffer_state[:pending_items]) + @buffer_state[:pending_size] += buffer_state[:pending_size] + ::File.delete(file_path) rescue => e - @buffer_config[:logger].error("Failed to store items: #{e.message}") + @buffer_config[:logger].error("Failed to load buffer from file: #{e.message}") end end + @buffer_config[:logger].info("Loaded buffer state from files") end - def buffer_initialize - @buffer_state[:pending_items] = [] - @buffer_state[:pending_size] = 0 + def flush_buffer_files + Dir.glob(::File.join(@buffer_config[:buffer_dir], 'buffer_state_*.log')).each do |file_path| + @buffer_config[:logger].info("Flushing from buffer state file: #{file_path}") + Thread.new { perform_flush([], file_path) } + end end end end diff --git a/lib/logstash/outputs/kusto/ingestor.rb b/lib/logstash/outputs/kusto/ingestor.rb index 0f9a6c21..b0c2d426 100755 --- a/lib/logstash/outputs/kusto/ingestor.rb +++ b/lib/logstash/outputs/kusto/ingestor.rb @@ -20,15 +20,13 @@ class Ingestor LOW_QUEUE_LENGTH = 3 FIELD_REF = /%\{[^}]+\}/ - def initialize(kusto_logstash_configuration, logger, threadpool = DEFAULT_THREADPOOL) - @retry_count = 3 - @retry_delay = 10 + def initialize(kusto_logstash_configuration, logger, latch_timeout = 60, threadpool = DEFAULT_THREADPOOL) @workers_pool = threadpool + @latch_timeout = latch_timeout @logger = logger #Validate and assign kusto_logstash_configuration.validate_config() @kusto_logstash_configuration = kusto_logstash_configuration - @logger.info('Preparing Kusto resources.') kusto_java = Java::com.microsoft.azure.kusto @@ -77,8 +75,6 @@ def initialize(kusto_logstash_configuration, logger, threadpool = DEFAULT_THREAD end @ingestion_properties = kusto_java.ingest.IngestionProperties.new(@kusto_logstash_configuration.kusto_ingest.database, @kusto_logstash_configuration.kusto_ingest.table) - @ingestion_properties.setReportLevel(Java::ComMicrosoftAzureKustoIngest::IngestionProperties::IngestionReportLevel::FAILURES_AND_SUCCESSES) - @ingestion_properties.setReportMethod(Java::ComMicrosoftAzureKustoIngest::IngestionProperties::IngestionReportMethod::TABLE) if @kusto_logstash_configuration.kusto_ingest.is_mapping_ref_provided @logger.debug('Using mapping reference.', @kusto_logstash_configuration.kusto_ingest.json_mapping) @ingestion_properties.setIngestionMapping(@kusto_logstash_configuration.kusto_ingest.json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON) @@ -87,15 +83,6 @@ def initialize(kusto_logstash_configuration, logger, threadpool = DEFAULT_THREAD @logger.debug('No mapping reference provided. Columns will be mapped by names in the logstash output') @ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON) end - # retry_policy = Java::com.azure.storage.common.policy - # duration = Java::java.time.Duration.ofSeconds(5) - - # fixed_delay_options = Java::com.azure.core.http.policy.FixedDelayOptions.new(1,duration) - # retry_options = Java::com.azure.core.http.policy.RetryOptions.new(fixed_delay_options) - # req_retry_options = Java::com.azure.storage.common.policy.RequestRetryOptions.fromRetryOptions(retry_options, Java::java.time.Duration.ofSeconds(10), "") - - # queued_ingest_client = @kusto_client.to_java(Java::com.microsoft.azure.kusto.ingest.QueuedIngestClientImpl) - # queued_ingest_client.setQueueRequestOptions(req_retry_options) @logger.debug('Kusto resources are ready.') end @@ -103,32 +90,62 @@ def upload_async(data) if @workers_pool.remaining_capacity <= LOW_QUEUE_LENGTH @logger.warn("Ingestor queue capacity is running low with #{@workers_pool.remaining_capacity} free slots.") end + exception = nil @workers_pool.post do - LogStash::Util.set_thread_name("Kusto to ingest data #{JRuby.reference(Thread.current).native_thread.id}") - upload(data) + LogStash::Util.set_thread_name("Kusto to ingest data") + begin + upload(data) + rescue => e + @logger.error('Error during async upload.', exception: e.class, message: e.message, backtrace: e.backtrace) + exception = e + end + end + # Wait for the task to complete and check for exceptions + @workers_pool.shutdown + @workers_pool.wait_for_termination + + if exception + @logger.error('StandardError in upload_async.', exception: exception.class, message: exception.message, backtrace: exception.backtrace) + raise exception end + rescue Exception => e + @logger.error('StandardError in upload_async.', exception: e.class, message: e.message, backtrace: e.backtrace) + raise e end def upload(data) - begin - @logger.debug("Sending data to Kusto") - if data.size > 0 + @logger.info("Sending data to Kusto") + + if data.size > 0 + ingestionLatch = java.util.concurrent.CountDownLatch.new(1) + thread_exception = nil + + Thread.new do + begin data_source_info = Java::com.microsoft.azure.kusto.ingest.source.StreamSourceInfo.new(java.io.ByteArrayInputStream.new(data.to_java_bytes)) - result = @kusto_client.ingestFromStream(data_source_info, @ingestion_properties) - else - @logger.warn("Data is empty and is not ingested.") + ingestion_result = @kusto_client.ingestFromStream(data_source_info, @ingestion_properties) + rescue => e + @logger.error('Error during ingestFromStream.', exception: e.class, message: e.message, backtrace: e.backtrace) + thread_exception = e + ensure + ingestionLatch.countDown() + end end - @logger.debug("Data sent to Kusto.") - rescue => e - if tries < @retry_count - tries += 1 - logger.warn("Uploading failed, retrying (##{tries} of #{@retry_count})", :exception => e.class, :message => e.message, :path => file.path, :backtrace => e.backtrace) - sleep @retry_delay - retry - else - logger.error("Failed to upload file (retried #{@retry_count} times).", :exception => e.class, :message => e.message, :path => file.path, :backtrace => e.backtrace) + + # Wait for the ingestion to complete with a timeout + if !ingestionLatch.await(@latch_timeout, java.util.concurrent.TimeUnit::SECONDS) + @logger.error('Ingestion timed out, possible network issue.') + raise 'Ingestion timed out, possible network issue.' end + # Raise the exception from the thread if it occurred + raise thread_exception if thread_exception + else + @logger.warn("Data is empty and is not ingested.") end + @logger.info("Data sent to Kusto.") + rescue => e + @logger.error('Uploading failed.', exception: e.class, message: e.message, backtrace: e.backtrace) + raise e # Raise the original error if ingestion fails end def stop From 0ce8a0846926a6896c1059a9277c99f981789c3e Mon Sep 17 00:00:00 2001 From: MonishkaDas Date: Thu, 26 Dec 2024 11:23:28 +0530 Subject: [PATCH 6/8] Add completableFuture with timeout --- lib/logstash/outputs/kusto/ingestor.rb | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/lib/logstash/outputs/kusto/ingestor.rb b/lib/logstash/outputs/kusto/ingestor.rb index b0c2d426..969653f9 100755 --- a/lib/logstash/outputs/kusto/ingestor.rb +++ b/lib/logstash/outputs/kusto/ingestor.rb @@ -115,28 +115,29 @@ def upload_async(data) def upload(data) @logger.info("Sending data to Kusto") - + if data.size > 0 - ingestionLatch = java.util.concurrent.CountDownLatch.new(1) thread_exception = nil - Thread.new do + future = java.util.concurrent.CompletableFuture.supplyAsync do begin data_source_info = Java::com.microsoft.azure.kusto.ingest.source.StreamSourceInfo.new(java.io.ByteArrayInputStream.new(data.to_java_bytes)) - ingestion_result = @kusto_client.ingestFromStream(data_source_info, @ingestion_properties) + @kusto_client.ingestFromStream(data_source_info, @ingestion_properties) rescue => e @logger.error('Error during ingestFromStream.', exception: e.class, message: e.message, backtrace: e.backtrace) thread_exception = e - ensure - ingestionLatch.countDown() end end - # Wait for the ingestion to complete with a timeout - if !ingestionLatch.await(@latch_timeout, java.util.concurrent.TimeUnit::SECONDS) + begin + future.get(@latch_timeout, java.util.concurrent.TimeUnit::SECONDS) + rescue java.util.concurrent.TimeoutException => e @logger.error('Ingestion timed out, possible network issue.') - raise 'Ingestion timed out, possible network issue.' + thread_exception = 'Ingestion timed out, possible network issue.' + rescue java.util.concurrent.ExecutionException => e + thread_exception = e.cause end + # Raise the exception from the thread if it occurred raise thread_exception if thread_exception else From b3015a4ca1655fb5ab793b18a1a25dc886614fa2 Mon Sep 17 00:00:00 2001 From: monishkadas_microsoft Date: Thu, 2 Jan 2025 14:09:03 +0530 Subject: [PATCH 7/8] Updated README.md and removed interval.rb --- README.md | 7 +-- lib/logstash/outputs/kusto.rb | 1 - lib/logstash/outputs/kusto/interval.rb | 81 -------------------------- 3 files changed, 2 insertions(+), 87 deletions(-) delete mode 100755 lib/logstash/outputs/kusto/interval.rb diff --git a/README.md b/README.md index bd113fbc..c3e2f1e0 100755 --- a/README.md +++ b/README.md @@ -48,8 +48,7 @@ output { proxy_protocol => <"http"|"https"> max_size => 10 max_interval => 10 - max_retries => 3 - failed_items_path => "" + latch_timeout => 60 } } ``` @@ -64,14 +63,12 @@ More information about configuring Logstash can be found in the [logstash config | **managed_identity** | Managed Identity to authenticate. For user-based managed ID, use the Client ID GUID. For system-based, use the value `system`. The ID needs to have 'ingest' privileges on the cluster. | Optional | | **database** | Database name to place events | Required | | **table** | Target table name to place events | Required | -| **failed_items_path** | Path to store failed items when max_retries is reached. Set to nil to disable persistence to file (May cause data loss). | Required | | **json_mapping** | Maps each attribute from incoming event JSON strings to the appropriate column in the table. Note that this must be in JSON format, as this is the interface between Logstash and Kusto | Optional | | **proxy_host** | The proxy hostname for redirecting traffic to Kusto. | Optional | | **proxy_port** | The proxy port for the proxy. Defaults to 80. | Optional | | **proxy_protocol** | The proxy server protocol, is one of http or https. | Optional | | **max_size** | Maximum size of the buffer before it gets flushed, defaults to 10MB. | Optional | -| **max_interval** | Maximum interval (in seconds) before the buffer gets flushed, defaults to 10. | Optional | -| **max_retries** | Maximum number of retries before the flush fails. Defaults to 3. | Optional | +| **latch_timeout** | Latch timeout in seconds, defaults to 60. This is the maximum wait time after which the flushing attempt is timed out and the network is considered to be down. The system waits for the network to be back to retry flushing the same batch. | Optional | > Note : LS_JAVA_OPTS can be used to set proxy parameters as well (using export or SET options) diff --git a/lib/logstash/outputs/kusto.rb b/lib/logstash/outputs/kusto.rb index 1011e958..a7d2ac9a 100755 --- a/lib/logstash/outputs/kusto.rb +++ b/lib/logstash/outputs/kusto.rb @@ -5,7 +5,6 @@ require 'logstash/errors' require 'logstash/outputs/kusto/ingestor' -require 'logstash/outputs/kusto/interval' require 'logstash/outputs/kusto/custom_size_based_buffer' require 'logstash/outputs/kusto/kustoLogstashConfiguration' diff --git a/lib/logstash/outputs/kusto/interval.rb b/lib/logstash/outputs/kusto/interval.rb deleted file mode 100755 index 6ba8d4a4..00000000 --- a/lib/logstash/outputs/kusto/interval.rb +++ /dev/null @@ -1,81 +0,0 @@ -# encoding: utf-8 - -require 'logstash/outputs/base' -require 'logstash/namespace' -require 'logstash/errors' - -class LogStash::Outputs::Kusto < LogStash::Outputs::Base - ## - # Bare-bones utility for running a block of code at an interval. - # - class Interval - ## - # Initializes a new Interval with the given arguments and starts it - # before returning it. - # - # @param interval [Integer] (see: Interval#initialize) - # @param procsy [#call] (see: Interval#initialize) - # - # @return [Interval] - # - def self.start(interval, procsy) - new(interval, procsy).tap(&:start) - end - - ## - # @param interval [Integer]: time in seconds to wait between calling the given proc - # @param procsy [#call]: proc or lambda to call periodically; must not raise exceptions. - def initialize(interval, procsy) - @interval = interval - @procsy = procsy - - # Mutex, ConditionVariable, etc. - @mutex = Mutex.new - @sleeper = ConditionVariable.new - end - - ## - # Starts the interval, or returns if it has already been started. - # - # @return [void] - def start - @mutex.synchronize do - return if @thread && @thread.alive? - - @thread = Thread.new { run } - end - end - - ## - # Stop the interval. - # Does not interrupt if execution is in-progress. - def stop - @mutex.synchronize do - @stopped = true - end - - @thread && @thread.join - end - - ## - # @return [Boolean] - def alive? - @thread && @thread.alive? - end - - private - - def run - @mutex.synchronize do - loop do - @sleeper.wait(@mutex, @interval) - break if @stopped - - @procsy.call - end - end - ensure - @sleeper.broadcast - end - end -end From 0e9f300e0aa66c5d05f9920971bced779aefe820 Mon Sep 17 00:00:00 2001 From: monishkadas_microsoft Date: Thu, 2 Jan 2025 15:04:46 +0530 Subject: [PATCH 8/8] Removed network_available method --- .../outputs/kusto/custom_size_based_buffer.rb | 21 +-- spec/outputs/kusto/interval_spec.rb | 69 -------- spec/outputs/kusto_spec.rb | 159 ------------------ 3 files changed, 3 insertions(+), 246 deletions(-) delete mode 100644 spec/outputs/kusto/interval_spec.rb delete mode 100755 spec/outputs/kusto_spec.rb diff --git a/lib/logstash/outputs/kusto/custom_size_based_buffer.rb b/lib/logstash/outputs/kusto/custom_size_based_buffer.rb index ce4b8a6d..7ce13db2 100644 --- a/lib/logstash/outputs/kusto/custom_size_based_buffer.rb +++ b/lib/logstash/outputs/kusto/custom_size_based_buffer.rb @@ -139,29 +139,14 @@ def perform_flush(events, file_path = nil) rescue => e @buffer_config[:logger].error("Flush failed: #{e.message}") @buffer_state[:network_down] = true - - while true - sleep(2) # Wait before checking network availability again - if network_available? - @buffer_config[:logger].info("Network is back up. Retrying flush.") - retry - end - end - + sleep(120) # Wait before checking network availability again + @buffer_config[:logger].info("Retrying flush.") + retry ensure @flush_mutex.unlock end end - def network_available? - begin - uri = URI('http://www.google.com') - response = Net::HTTP.get_response(uri) - response.is_a?(Net::HTTPSuccess) - rescue - false - end - end def save_buffer_to_file(events) buffer_state_copy = { diff --git a/spec/outputs/kusto/interval_spec.rb b/spec/outputs/kusto/interval_spec.rb deleted file mode 100644 index ccd9cf46..00000000 --- a/spec/outputs/kusto/interval_spec.rb +++ /dev/null @@ -1,69 +0,0 @@ -# # spec/interval_test.rb -# require 'rspec' -# require 'logstash/outputs/kusto/interval' - - -# describe LogStash::Outputs::Kusto::Interval do -# let(:interval_time) { 1 } -# let(:procsy) { double("procsy", call: true) } - -# describe '#initialize' do -# it 'initializes with the correct interval and procsy' do -# interval = described_class.new(interval_time, procsy) -# expect(interval.instance_variable_get(:@interval)).to eq(interval_time) -# expect(interval.instance_variable_get(:@procsy)).to eq(procsy) -# end -# end - -# describe '#start' do -# it 'starts the interval thread' do -# interval = described_class.new(interval_time, procsy) -# interval.start -# expect(interval.alive?).to be true -# interval.stop -# end - -# it 'does not start a new thread if already started' do -# interval = described_class.new(interval_time, procsy) -# interval.start -# first_thread = interval.instance_variable_get(:@thread) -# interval.start -# second_thread = interval.instance_variable_get(:@thread) -# expect(first_thread).to eq(second_thread) -# interval.stop -# end -# end - -# describe '#stop' do -# it 'stops the interval thread' do -# interval = described_class.new(interval_time, procsy) -# interval.start -# interval.stop -# expect(interval.alive?).to be false -# end -# end - -# describe '#alive?' do -# it 'returns true if the thread is alive' do -# interval = described_class.new(interval_time, procsy) -# interval.start -# expect(interval.alive?).to be true -# interval.stop -# end - -# it 'returns false if the thread is not alive' do -# interval = described_class.new(interval_time, procsy) -# expect(interval.alive?).to be false -# end -# end - -# describe 'interval execution' do -# it 'calls the proc at the specified interval' do -# interval = described_class.new(interval_time, procsy) -# expect(procsy).to receive(:call).at_least(:twice) -# interval.start -# sleep(2.5) -# interval.stop -# end -# end -# end \ No newline at end of file diff --git a/spec/outputs/kusto_spec.rb b/spec/outputs/kusto_spec.rb deleted file mode 100755 index 967f9a11..00000000 --- a/spec/outputs/kusto_spec.rb +++ /dev/null @@ -1,159 +0,0 @@ -# encoding: utf-8 -require_relative "../spec_helpers.rb" -require 'logstash/outputs/kusto' -require 'logstash/codecs/plain' -require 'logstash/event' - -describe LogStash::Outputs::Kusto do - - let(:options) { { "path" => "./kusto_tst/%{+YYYY-MM-dd-HH-mm}", - "ingest_url" => "https://ingest-sdkse2etest.eastus.kusto.windows.net/", - "app_id" => "myid", - "app_key" => "mykey", - "app_tenant" => "mytenant", - "database" => "mydatabase", - "table" => "mytable", - "json_mapping" => "mymapping", - "proxy_host" => "localhost", - "proxy_port" => 3128, - "proxy_protocol" => "https", - "max_size" => 2000, - "max_interval" => 10 - } } - - describe '#initialize' do - it 'initializes with the correct options' do - RSpec.configuration.reporter.message("Running test: initializes with the correct options") - kusto = described_class.new(options.merge("app_key" => LogStash::Util::Password.new("mykey"))) - expect(kusto.instance_variable_get(:@path)).to eq("./kusto_tst/%{+YYYY-MM-dd-HH-mm}") - expect(kusto.instance_variable_get(:@ingest_url)).to eq("https://ingest-sdkse2etest.eastus.kusto.windows.net/") - expect(kusto.instance_variable_get(:@app_id)).to eq("myid") - expect(kusto.instance_variable_get(:@app_key).value).to eq("mykey") - expect(kusto.instance_variable_get(:@app_tenant)).to eq("mytenant") - expect(kusto.instance_variable_get(:@database)).to eq("mydatabase") - expect(kusto.instance_variable_get(:@table)).to eq("mytable") - expect(kusto.instance_variable_get(:@json_mapping)).to eq("mymapping") - expect(kusto.instance_variable_get(:@proxy_host)).to eq("localhost") - expect(kusto.instance_variable_get(:@proxy_port)).to eq(3128) - expect(kusto.instance_variable_get(:@proxy_protocol)).to eq("https") - expect(kusto.instance_variable_get(:@max_size)).to eq(2000) - expect(kusto.instance_variable_get(:@max_interval)).to eq(10) - RSpec.configuration.reporter.message("Completed test: initializes with the correct options") - end - end - - describe '#multi_receive_encoded' do - it 'processes events and adds them to the buffer' do - RSpec.configuration.reporter.message("Running test: processes events and adds them to the buffer") - kusto = described_class.new(options) - kusto.register - - events = [LogStash::Event.new("message" => "test1"), LogStash::Event.new("message" => "test2")] - encoded_events = events.map { |e| [e, e.to_json] } - - # Temporarily disable automatic flushing for the test - buffer = kusto.instance_variable_get(:@buffer) - allow(buffer).to receive(:buffer_flush) - - # Clear the buffer before the test - buffer.instance_variable_set(:@buffer_state, { pending_items: [], pending_size: 0, last_flush: Time.now.to_i }) - - kusto.multi_receive_encoded(encoded_events) - - pending_items = buffer.instance_variable_get(:@buffer_state)[:pending_items] - RSpec.configuration.reporter.message("Pending items in buffer: #{pending_items.inspect}") - - expect(pending_items.size).to eq(2) - RSpec.configuration.reporter.message("Completed test: processes events and adds them to the buffer") - end - - it 'handles errors during event processing' do - RSpec.configuration.reporter.message("Running test: handles errors during event processing") - kusto = described_class.new(options) - kusto.register - - allow(kusto.instance_variable_get(:@buffer)).to receive(:<<).and_raise(StandardError.new("Test error")) - events = [LogStash::Event.new("message" => "test1")] - encoded_events = events.map { |e| [e, e.to_json] } - - expect { kusto.multi_receive_encoded(encoded_events) }.not_to raise_error - RSpec.configuration.reporter.message("Completed test: handles errors during event processing") - end - end - - describe '#register' do - it 'raises an error for invalid configurations' do - RSpec.configuration.reporter.message("Running test: raises an error for invalid configurations") - invalid_options = options.merge("ingest_url" => nil) - expect { described_class.new(invalid_options).register }.to raise_error(LogStash::ConfigurationError) - RSpec.configuration.reporter.message("Completed test: raises an error for invalid configurations") - end - end - - describe '#flush_buffer' do - - it 'flushes the buffer when max_size is reached' do - RSpec.configuration.reporter.message("Running test: flushes the buffer when max_size is reached") - kusto = described_class.new(options.merge("max_size" => 1)) # Set max_size to 1MB for testing - kusto.register - - events = [LogStash::Event.new("message" => "test1")] - encoded_events = events.map { |e| [e, e.to_json] } - # Ensure upload_async is called only once - expect(kusto.instance_variable_get(:@ingestor)).to receive(:upload_async).with(anything).once - kusto.multi_receive_encoded(encoded_events) - - # Trigger the buffer flush manually - buffer = kusto.instance_variable_get(:@buffer) - buffer.send(:buffer_flush, force: true) - - RSpec.configuration.reporter.message("Completed test: flushes the buffer when max_size is reached") - end - - it 'flushes the buffer when max_interval is reached' do - RSpec.configuration.reporter.message("Running test: flushes the buffer when max_interval is reached") - kusto = described_class.new(options.merge("max_interval" => 1)) # Set max_interval to 1 second for testing - kusto.register - - events = [LogStash::Event.new("message" => "test1")] - encoded_events = events.map { |e| [e, e.to_json] } - kusto.multi_receive_encoded(encoded_events) - sleep(2) # Wait for the interval to pass - - expect(kusto.instance_variable_get(:@ingestor)).to receive(:upload_async).with(anything) - kusto.flush_buffer(encoded_events) # Pass the encoded events here - RSpec.configuration.reporter.message("Completed test: flushes the buffer when max_interval is reached") - end - - it 'eventually flushes without receiving additional events based on max_interval' do - RSpec.configuration.reporter.message("Running test: eventually flushes without receiving additional events based on max_interval") - kusto = described_class.new(options.merge("max_interval" => 1)) # Set max_interval to 1 second for testing - kusto.register - - events = [LogStash::Event.new("message" => "test1")] - encoded_events = events.map { |e| [e, e.to_json] } - kusto.multi_receive_encoded(encoded_events) - - # Wait for the interval to pass - sleep(2) - - expect(kusto.instance_variable_get(:@ingestor)).to receive(:upload_async).with(anything) - kusto.flush_buffer(encoded_events) # Pass the encoded events here - RSpec.configuration.reporter.message("Completed test: eventually flushes without receiving additional events based on max_interval") - end - end - - describe '#close' do - it 'shuts down the buffer and ingestor' do - RSpec.configuration.reporter.message("Running test: shuts down the buffer and ingestor") - kusto = described_class.new(options) - kusto.register - - expect(kusto.instance_variable_get(:@buffer)).to receive(:shutdown) - expect(kusto.instance_variable_get(:@ingestor)).to receive(:stop) - - kusto.close - RSpec.configuration.reporter.message("Completed test: shuts down the buffer and ingestor") - end - end -end \ No newline at end of file