From 3695a35a66ee8ae27133e96331d6a869a1aa56f6 Mon Sep 17 00:00:00 2001 From: ag-ramachandran Date: Tue, 26 Aug 2025 10:25:32 +0530 Subject: [PATCH 1/7] *Add thread-id as well in the code referencing worker thread in the path to avoid race conditions --- lib/logstash/outputs/kusto.rb | 105 +++++++++++----------------------- 1 file changed, 34 insertions(+), 71 deletions(-) diff --git a/lib/logstash/outputs/kusto.rb b/lib/logstash/outputs/kusto.rb index cd61d5d2..bbd1913b 100755 --- a/lib/logstash/outputs/kusto.rb +++ b/lib/logstash/outputs/kusto.rb @@ -67,7 +67,6 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base # If `false`, the plugin will disregard temp files found config :recovery, validate: :boolean, default: true - # The Kusto endpoint for ingestion related communication. You can see it on the Azure Portal. config :ingest_url, validate: :string, required: true @@ -95,7 +94,6 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base # Mapping name - deprecated, use json_mapping config :mapping, validate: :string, deprecated: true - # Determines if local files used for temporary storage will be deleted # after upload is successful config :delete_temp_files, validate: :boolean, default: true @@ -132,13 +130,14 @@ def register final_mapping = mapping end + @current_ls_worker_thread_id = Thread.current.object_id # TODO: add id to the tmp path to support multiple outputs of the same type. # TODO: Fix final_mapping when dynamic routing is supported # add fields from the meta that will note the destination of the events in the file @path = if dynamic_event_routing File.expand_path("#{path}.%{[@metadata][database]}.%{[@metadata][table]}.%{[@metadata][final_mapping]}") else - File.expand_path("#{path}.#{database}.#{table}") + File.expand_path("#{path}.#{database}.#{table}.#{@current_ls_worker_thread_id}") end validate_path @@ -150,12 +149,18 @@ def register end @failure_path = File.join(@file_root, @filename_failure) - executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1, - max_threads: upload_concurrent_count, - max_queue: upload_queue_size, - fallback_policy: :caller_runs) + executor = Concurrent::ThreadPoolExecutor.new( + min_threads: 1, + max_threads: upload_concurrent_count, + max_queue: upload_queue_size, + fallback_policy: :caller_runs + ) - @ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cli_auth, database, table, final_mapping, delete_temp_files, proxy_host, proxy_port,proxy_protocol, @logger, executor) + @ingestor = Ingestor.new( + ingest_url, app_id, app_key, app_tenant, managed_identity, cli_auth, + database, table, final_mapping, delete_temp_files, + proxy_host, proxy_port, proxy_protocol, @logger, executor + ) # send existing files recover_past_files if recovery @@ -173,23 +178,22 @@ def register end private + def validate_path if (root_directory =~ FIELD_REF) != nil @logger.error('The starting part of the path should not be dynamic.', path: @path) raise LogStash::ConfigurationError.new('The starting part of the path should not be dynamic.') end - if !path_with_field_ref? + unless path_with_field_ref? @logger.error('Path should include some time related fields to allow for file rotation.', path: @path) raise LogStash::ConfigurationError.new('Path should include some time related fields to allow for file rotation.') end end - private def root_directory parts = @path.split(File::SEPARATOR).reject(&:empty?) if Gem.win_platform? - # First part is the drive letter parts[1] else parts.first @@ -197,6 +201,7 @@ def root_directory end public + def multi_receive_encoded(events_and_encoded) encoded_by_path = Hash.new { |h, k| h[k] = [] } @@ -208,11 +213,9 @@ def multi_receive_encoded(events_and_encoded) @io_mutex.synchronize do encoded_by_path.each do |path, chunks| fd = open(path) - # append to the file chunks.each { |chunk| fd.write(chunk) } fd.flush unless @flusher && @flusher.alive? end - close_stale_files if @stale_cleanup_type == 'events' end end @@ -222,29 +225,26 @@ def close @cleaner.stop unless @cleaner.nil? @io_mutex.synchronize do @logger.debug('Close: closing files') - @files.each do |path, fd| begin fd.close @logger.debug("Closed file #{path}", fd: fd) - kusto_send_file(path) rescue Exception => e @logger.error('Exception while flushing and closing files.', exception: e) end end end - @ingestor.stop unless @ingestor.nil? end private + def inside_file_root?(log_path) target_file = File.expand_path(log_path) - return target_file.start_with?("#{@file_root}/") + target_file.start_with?("#{@file_root}/") end - private def event_path(event) file_output_path = generate_filepath(event) if path_with_field_ref? && !inside_file_root?(file_output_path) @@ -254,77 +254,60 @@ def event_path(event) file_output_path = @failure_path end @logger.debug('Writing event to tmp file.', filename: file_output_path) - file_output_path end - private def generate_filepath(event) event.sprintf(@path) end - private def path_with_field_ref? path =~ FIELD_REF end - private def extract_file_root parts = File.expand_path(path).split(File::SEPARATOR) parts.take_while { |part| part !~ FIELD_REF }.join(File::SEPARATOR) end - # the back-bone of @flusher, our periodic-flushing interval. - private def flush_pending_files @io_mutex.synchronize do @logger.debug('Starting flush cycle') - @files.each do |path, fd| @logger.debug('Flushing file', path: path, fd: fd) fd.flush end end rescue Exception => e - # squash exceptions caught while flushing after logging them @logger.error('Exception flushing files', exception: e.message, backtrace: e.backtrace) end - # every 10 seconds or so (triggered by events, but if there are no events there's no point closing files anyway) - private def close_stale_files now = Time.now return unless now - @last_stale_cleanup_cycle >= @stale_cleanup_interval - @logger.debug('Starting stale files cleanup cycle', files: @files) - inactive_files = @files.select { |path, fd| not fd.active } + inactive_files = @files.select { |path, fd| !fd.active } @logger.debug("#{inactive_files.count} stale files found", inactive_files: inactive_files) inactive_files.each do |path, fd| @logger.info("Closing file #{path}") fd.close @files.delete(path) - kusto_send_file(path) end - # mark all files as inactive, a call to write will mark them as active again @files.each { |path, fd| fd.active = false } @last_stale_cleanup_cycle = now end - private def cached?(path) @files.include?(path) && !@files[path].nil? end - private def deleted?(path) !File.exist?(path) end - private def open(path) return @files[path] if !deleted?(path) && cached?(path) - if deleted?(path) if @create_if_deleted @logger.debug('Required file does not exist, creating it.', path: path) @@ -333,11 +316,9 @@ def open(path) return @files[path] if cached?(path) end end - @logger.info('Opening file', path: path) - dir = File.dirname(path) - if !Dir.exist?(dir) + unless Dir.exist?(dir) @logger.info('Creating directory', directory: dir) if @dir_mode != -1 FileUtils.mkdir_p(dir, mode: @dir_mode) @@ -345,55 +326,37 @@ def open(path) FileUtils.mkdir_p(dir) end end - - # work around a bug opening fifos (bug JRUBY-6280) stat = begin - File.stat(path) - rescue - nil - end - fd = if stat && stat.ftype == 'fifo' && LogStash::Environment.jruby? - java.io.FileWriter.new(java.io.File.new(path)) - elsif @file_mode != -1 - File.new(path, 'a+', @file_mode) - else - File.new(path, 'a+') - end - # fd = if @file_mode != -1 - # File.new(path, 'a+', @file_mode) - # else - # File.new(path, 'a+') - # end - # end + File.stat(path) + rescue + nil + end + fd = if stat && stat.ftype == 'fifo' && LogStash::Environment.jruby? + java.io.FileWriter.new(java.io.File.new(path)) + elsif @file_mode != -1 + File.new(path, 'a+', @file_mode) + else + File.new(path, 'a+') + end @files[path] = IOWriter.new(fd) end - private def kusto_send_file(file_path) @ingestor.upload_async(file_path, delete_temp_files) end - private def recover_past_files require 'find' - - # we need to find the last "regular" part in the path before any dynamic vars path_last_char = @path.length - 1 - pattern_start = @path.index('%') || path_last_char last_folder_before_pattern = @path.rindex('/', pattern_start) || path_last_char new_path = path[0..last_folder_before_pattern] - begin return unless Dir.exist?(new_path) @logger.info("Going to recover old files in path #{@new_path}") - old_files = Find.find(new_path).select { |p| /.*\.#{database}\.#{table}$/ =~ p } @logger.info("Found #{old_files.length} old file(s), sending them now...") - - old_files.each do |file| - kusto_send_file(file) - end + old_files.each { |file| kusto_send_file(file) } rescue Errno::ENOENT => e @logger.warn('No such file or directory', exception: e.class, message: e.message, path: new_path, backtrace: e.backtrace) end @@ -402,6 +365,8 @@ def recover_past_files # wrapper class class IOWriter + attr_accessor :active + def initialize(io) @io = io end @@ -417,11 +382,9 @@ def flush def method_missing(method_name, *args, &block) if @io.respond_to?(method_name) - @io.send(method_name, *args, &block) else super end end - attr_accessor :active end From eaddfdeb284d3082e858c0e346971963775e2839 Mon Sep 17 00:00:00 2001 From: ag-ramachandran Date: Tue, 26 Aug 2025 15:32:07 +0530 Subject: [PATCH 2/7] *Change the threadid usage in multi-line-encoded --- lib/logstash/outputs/kusto.rb | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/lib/logstash/outputs/kusto.rb b/lib/logstash/outputs/kusto.rb index bbd1913b..07d4dd9b 100755 --- a/lib/logstash/outputs/kusto.rb +++ b/lib/logstash/outputs/kusto.rb @@ -130,14 +130,13 @@ def register final_mapping = mapping end - @current_ls_worker_thread_id = Thread.current.object_id # TODO: add id to the tmp path to support multiple outputs of the same type. # TODO: Fix final_mapping when dynamic routing is supported # add fields from the meta that will note the destination of the events in the file @path = if dynamic_event_routing File.expand_path("#{path}.%{[@metadata][database]}.%{[@metadata][table]}.%{[@metadata][final_mapping]}") else - File.expand_path("#{path}.#{database}.#{table}.#{@current_ls_worker_thread_id}") + File.expand_path("#{path}.#{database}.#{table}") end validate_path @@ -204,9 +203,9 @@ def root_directory def multi_receive_encoded(events_and_encoded) encoded_by_path = Hash.new { |h, k| h[k] = [] } - + thread_id = Thread.current.object_id events_and_encoded.each do |event, encoded| - file_output_path = event_path(event) + file_output_path = event_path(event, thread_id) encoded_by_path[file_output_path] << encoded end @@ -245,8 +244,8 @@ def inside_file_root?(log_path) target_file.start_with?("#{@file_root}/") end - def event_path(event) - file_output_path = generate_filepath(event) + def event_path(event, thread_id) + file_output_path = generate_filepath(event, thread_id) if path_with_field_ref? && !inside_file_root?(file_output_path) @logger.warn('The event tried to write outside the files root, writing the event to the failure file', event: event, filename: @failure_path) file_output_path = @failure_path @@ -257,8 +256,8 @@ def event_path(event) file_output_path end - def generate_filepath(event) - event.sprintf(@path) + def generate_filepath(event, thread_id) + event.sprintf(@path) + ".#{thread_id}" end def path_with_field_ref? From 729d9263f421e608d193d4d42718922d54f3b91c Mon Sep 17 00:00:00 2001 From: ag-ramachandran Date: Tue, 26 Aug 2025 17:49:22 +0530 Subject: [PATCH 3/7] *Bump versions, Update README --- .github/workflows/build.yml | 2 +- README.md | 4 +++- random-data-json.sh | 28 ++++++++++++++++++++++++++++ random-data.sh | 9 +++++++++ version | 2 +- 5 files changed, 42 insertions(+), 3 deletions(-) create mode 100755 random-data-json.sh create mode 100755 random-data.sh diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index a0e6ca3f..12fe128b 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -16,7 +16,7 @@ jobs: strategy: matrix: logstash: [ - { version: '8.7.0', url: "https://artifacts.elastic.co/downloads/logstash/logstash-8.7.0-linux-x86_64.tar.gz" , main: 'true' } + { version: '9.1.0', url: "https://artifacts.elastic.co/downloads/logstash/logstash-9.1.0-linux-x86_64.tar.gz" , main: 'true' } ] env: LOGSTASH_SOURCE: 1 diff --git a/README.md b/README.md index b256a266..edb3aa42 100755 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ This connector forwards data to ## Requirements -- Logstash version 6+. [Installation instructions](https://www.elastic.co/guide/en/logstash/current/installing-logstash.html) +- The latest version is tested for Logstash version 9.1+ (uses JDK 21) [Installation instructions](https://www.elastic.co/guide/en/logstash/current/installing-logstash.html) - Azure Data Explorer cluster with a database. Read [Create a cluster and database](https://docs.microsoft.com/en-us/azure/data-explorer/create-cluster-database-portal) for more information. - AAD Application credentials with permission to ingest data into Azure Data Explorer. Read [Creating an AAD Application](https://docs.microsoft.com/en-us/azure/kusto/management/access-control/how-to-provision-aad-app) for more information. @@ -81,6 +81,8 @@ export LS_JAVA_OPTS="-Dhttp.proxyHost=1.2.34 -Dhttp.proxyPort=8989 -Dhttps.prox | Version | Release Date | Notes | | --- | --- | --- | +| 2.1.1 | 2025-09-25 | - Fix #84 adding thread-id as well to the file path to avoid race conditions in multi-worker scenario | +| 2.1.0 | 2025-09-25 | - Bump SDK versions | | 2.0.8 | 2024-10-23 | - Fix library deprecations, fix issues in the Azure Identity library | | 2.0.7 | 2024-01-01 | - Update Kusto JAVA SDK | | 2.0.3 | 2023-12-12 | - Make JSON mapping field optional. If not provided logstash output JSON attribute names will be used for column resolution | diff --git a/random-data-json.sh b/random-data-json.sh new file mode 100755 index 00000000..8c68a4b2 --- /dev/null +++ b/random-data-json.sh @@ -0,0 +1,28 @@ +#!/usr/bin/env bash +# you will need to install jq for JSON handling + +while true +do + # Generate random IP + random_ip=$(dd if=/dev/urandom bs=4 count=1 2>/dev/null | od -An -tu1 | sed -e 's/^ *//' -e 's/ */./g') + + # Generate random response size and HTTP status + random_size=$(( (RANDOM % 65535) + 1 )) + status_codes=(200 201 400 404 500) + random_status=${status_codes[$RANDOM % ${#status_codes[@]}]} + + # Generate current timestamp in ISO 8601 + timestamp=$(date -u +"%Y-%m-%dT%H:%M:%SZ") + + # Random endpoint + endpoints=("/api/data" "/user/login" "/metrics" "/products/123" "/health") + random_endpoint=${endpoints[$RANDOM % ${#endpoints[@]}]} + + # Construct JSON log + json_log=$(jq -c -n --arg ip "$random_ip" --arg ts "$timestamp" --arg endpoint "$random_endpoint" --arg status "$random_status" --arg size "$random_size" '{ip: $ip, timestamp: $ts, endpoint: $endpoint, status: ($status|tonumber), size: ($size|tonumber)}') + + + echo "$json_log" | tee -a '/tmp/jsonlogs.txt' + + sleep 0.1 +done diff --git a/random-data.sh b/random-data.sh new file mode 100755 index 00000000..16ad9dcd --- /dev/null +++ b/random-data.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash +while true +do + random_ip=$(dd if=/dev/urandom bs=4 count=1 2>/dev/null | od -An -tu1 | sed -e 's/^ *//' -e 's/ */./g') + random_size=$(( (RANDOM % 65535) + 1 )) + current_date_time=$(date '+%d/%b/%Y:%H:%M:%S %z') + echo "$random_ip - - [$current_date_time] \"GET /data.php HTTP/1.1\" 200 $random_size" | tee -a '/tmp/curllogs.txt' + sleep 0.0000001s +done \ No newline at end of file diff --git a/version b/version index 50aea0e7..7c327287 100644 --- a/version +++ b/version @@ -1 +1 @@ -2.1.0 \ No newline at end of file +2.1.1 \ No newline at end of file From 35302f48af2ba2791b07683da27e6ec428cfa74b Mon Sep 17 00:00:00 2001 From: ag-ramachandran Date: Tue, 26 Aug 2025 17:58:41 +0530 Subject: [PATCH 4/7] *Fix issue in the new_path variable in kusto.rb and reduce sleep in the shell script --- lib/logstash/outputs/kusto.rb | 2 +- random-data.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/logstash/outputs/kusto.rb b/lib/logstash/outputs/kusto.rb index 07d4dd9b..553a29d6 100755 --- a/lib/logstash/outputs/kusto.rb +++ b/lib/logstash/outputs/kusto.rb @@ -352,7 +352,7 @@ def recover_past_files new_path = path[0..last_folder_before_pattern] begin return unless Dir.exist?(new_path) - @logger.info("Going to recover old files in path #{@new_path}") + @logger.info("Going to recover old files in path #{new_path}") old_files = Find.find(new_path).select { |p| /.*\.#{database}\.#{table}$/ =~ p } @logger.info("Found #{old_files.length} old file(s), sending them now...") old_files.each { |file| kusto_send_file(file) } diff --git a/random-data.sh b/random-data.sh index 16ad9dcd..9c7f6153 100755 --- a/random-data.sh +++ b/random-data.sh @@ -5,5 +5,5 @@ do random_size=$(( (RANDOM % 65535) + 1 )) current_date_time=$(date '+%d/%b/%Y:%H:%M:%S %z') echo "$random_ip - - [$current_date_time] \"GET /data.php HTTP/1.1\" 200 $random_size" | tee -a '/tmp/curllogs.txt' - sleep 0.0000001s + sleep 0.001s done \ No newline at end of file From 58130513d752d154e5d9b155b28974363625353d Mon Sep 17 00:00:00 2001 From: ag-ramachandran Date: Wed, 27 Aug 2025 06:58:41 +0530 Subject: [PATCH 5/7] *Add additional log line to track files sent to Kusto --- lib/logstash/outputs/kusto/ingestor.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logstash/outputs/kusto/ingestor.rb b/lib/logstash/outputs/kusto/ingestor.rb index f2e10601..8512a073 100755 --- a/lib/logstash/outputs/kusto/ingestor.rb +++ b/lib/logstash/outputs/kusto/ingestor.rb @@ -162,7 +162,7 @@ def upload(path, delete_on_success) @logger.warn("File #{path} is an empty file and is not ingested.") end File.delete(path) if delete_on_success - @logger.debug("File #{path} sent to kusto.") + @logger.info("File #{path} sent to kusto.") rescue Errno::ENOENT => e @logger.error("File doesn't exist! Unrecoverable error.", exception: e.class, message: e.message, path: path, backtrace: e.backtrace) rescue Java::JavaNioFile::NoSuchFileException => e From 80d8f748715929b77aaef67b298766873c8b41fd Mon Sep 17 00:00:00 2001 From: ag-ramachandran Date: Wed, 27 Aug 2025 09:53:45 +0530 Subject: [PATCH 6/7] *Format ingestor.rb file --- lib/logstash/outputs/kusto/ingestor.rb | 45 +++++++++++++------------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/lib/logstash/outputs/kusto/ingestor.rb b/lib/logstash/outputs/kusto/ingestor.rb index 8512a073..c3c2a2d0 100755 --- a/lib/logstash/outputs/kusto/ingestor.rb +++ b/lib/logstash/outputs/kusto/ingestor.rb @@ -20,10 +20,10 @@ class Ingestor LOW_QUEUE_LENGTH = 3 FIELD_REF = /%\{[^}]+\}/ - def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli_auth, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,logger, threadpool = DEFAULT_THREADPOOL) + def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli_auth, database, table, json_mapping, delete_local, proxy_host, proxy_port, proxy_protocol, logger, threadpool = DEFAULT_THREADPOOL) @workers_pool = threadpool @logger = logger - validate_config(database, table, json_mapping,proxy_protocol,app_id, app_key, managed_identity_id,cli_auth) + validate_config(database, table, json_mapping, proxy_protocol, app_id, app_key, managed_identity_id, cli_auth) @logger.info('Preparing Kusto resources.') kusto_java = Java::com.microsoft.azure.kusto @@ -38,22 +38,22 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli is_direct_conn = (proxy_host.nil? || proxy_host.empty?) # Create a connection string kusto_connection_string = if is_managed_identity - if is_system_assigned_managed_identity - @logger.info('Using system managed identity.') - kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url) - else - @logger.info('Using user managed identity.') - kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url, managed_identity_id) - end + if is_system_assigned_managed_identity + @logger.info('Using system managed identity.') + kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url) else - if cli_auth - @logger.warn('*Use of CLI Auth is only for dev-test scenarios. This is ***NOT RECOMMENDED*** for production*') - kusto_java.data.auth.ConnectionStringBuilder.createWithAzureCli(ingest_url) - else - @logger.info('Using app id and app key.') - kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant) - end + @logger.info('Using user managed identity.') + kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url, managed_identity_id) end + else + if cli_auth + @logger.warn('*Use of CLI Auth is only for dev-test scenarios. This is ***NOT RECOMMENDED*** for production*') + kusto_java.data.auth.ConnectionStringBuilder.createWithAzureCli(ingest_url) + else + @logger.info('Using app id and app key.') + kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant) + end + end # @logger.debug(Gem.loaded_specs.to_s) # Unfortunately there's no way to avoid using the gem/plugin name directly... @@ -62,8 +62,8 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli java_util = Java::java.util # kusto_connection_string.setClientVersionForTracing(name_for_tracing) - version_for_tracing=Gem.loaded_specs['logstash-output-kusto']&.version || "unknown" - kusto_connection_string.setConnectorDetails("Logstash",version_for_tracing.to_s,"","",false,"", java_util.Collections.emptyMap()); + version_for_tracing = Gem.loaded_specs['logstash-output-kusto']&.version || "unknown" + kusto_connection_string.setConnectorDetails("Logstash", version_for_tracing.to_s, "", "", false, "", java_util.Collections.emptyMap()) @kusto_client = begin if is_direct_conn kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string) @@ -91,7 +91,7 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli @logger.debug('Kusto resources are ready.') end - def validate_config(database, table, json_mapping, proxy_protocol, app_id, app_key, managed_identity_id,cli_auth) + def validate_config(database, table, json_mapping, proxy_protocol, app_id, app_key, managed_identity_id, cli_auth) # Add an additional validation and fail this upfront if app_id.nil? && app_key.nil? && managed_identity_id.nil? if cli_auth @@ -100,7 +100,7 @@ def validate_config(database, table, json_mapping, proxy_protocol, app_id, app_k @logger.error('managed_identity_id is not provided and app_id/app_key is empty.') raise LogStash::ConfigurationError.new('managed_identity_id is not provided and app_id/app_key is empty.') end - end + end if database =~ FIELD_REF @logger.error('database config value should not be dynamic.', database) raise LogStash::ConfigurationError.new('database config value should not be dynamic.') @@ -116,11 +116,10 @@ def validate_config(database, table, json_mapping, proxy_protocol, app_id, app_k raise LogStash::ConfigurationError.new('json_mapping config value should not be dynamic.') end - if not(["https", "http"].include? proxy_protocol) + unless ["https", "http"].include? proxy_protocol @logger.error('proxy_protocol has to be http or https.', proxy_protocol) raise LogStash::ConfigurationError.new('proxy_protocol has to be http or https.') end - end def upload_async(path, delete_on_success) @@ -156,7 +155,7 @@ def upload(path, delete_on_success) # end if file_size > 0 - file_source_info = Java::com.microsoft.azure.kusto.ingest.source.FileSourceInfo.new(path); # 0 - let the sdk figure out the size of the file + file_source_info = Java::com.microsoft.azure.kusto.ingest.source.FileSourceInfo.new(path) # 0 - let the sdk figure out the size of the file @kusto_client.ingestFromFile(file_source_info, @ingestion_properties) else @logger.warn("File #{path} is an empty file and is not ingested.") From 0fd52a115a2fcd7aa51d73c07ed16c0f3e10efff Mon Sep 17 00:00:00 2001 From: ag-ramachandran Date: Fri, 5 Sep 2025 13:29:06 +0530 Subject: [PATCH 7/7] *Update to use async future and delete only on success --- build.gradle | 1 - lib/logstash/outputs/kusto.rb | 11 ++++----- lib/logstash/outputs/kusto/ingestor.rb | 33 +++++++++++++++++++++++++- 3 files changed, 37 insertions(+), 8 deletions(-) diff --git a/build.gradle b/build.gradle index da8f5435..e8055f87 100644 --- a/build.gradle +++ b/build.gradle @@ -93,7 +93,6 @@ dependencies { implementation 'net.java.dev.jna:jna:5.13.0' implementation 'net.minidev:accessors-smart:2.5.2' implementation 'net.minidev:json-smart:2.5.2' - implementation 'org.apache.commons:commons-lang3:3.14.0' implementation 'org.apache.commons:commons-text:1.11.0' implementation 'org.apache.httpcomponents:httpclient:4.5.14' implementation 'org.apache.httpcomponents:httpcore:4.4.16' diff --git a/lib/logstash/outputs/kusto.rb b/lib/logstash/outputs/kusto.rb index 553a29d6..251d64a1 100755 --- a/lib/logstash/outputs/kusto.rb +++ b/lib/logstash/outputs/kusto.rb @@ -203,9 +203,8 @@ def root_directory def multi_receive_encoded(events_and_encoded) encoded_by_path = Hash.new { |h, k| h[k] = [] } - thread_id = Thread.current.object_id events_and_encoded.each do |event, encoded| - file_output_path = event_path(event, thread_id) + file_output_path = event_path(event) encoded_by_path[file_output_path] << encoded end @@ -244,8 +243,8 @@ def inside_file_root?(log_path) target_file.start_with?("#{@file_root}/") end - def event_path(event, thread_id) - file_output_path = generate_filepath(event, thread_id) + def event_path(event) + file_output_path = generate_filepath(event) if path_with_field_ref? && !inside_file_root?(file_output_path) @logger.warn('The event tried to write outside the files root, writing the event to the failure file', event: event, filename: @failure_path) file_output_path = @failure_path @@ -256,8 +255,8 @@ def event_path(event, thread_id) file_output_path end - def generate_filepath(event, thread_id) - event.sprintf(@path) + ".#{thread_id}" + def generate_filepath(event) + event.sprintf(@path) end def path_with_field_ref? diff --git a/lib/logstash/outputs/kusto/ingestor.rb b/lib/logstash/outputs/kusto/ingestor.rb index c3c2a2d0..d273c793 100755 --- a/lib/logstash/outputs/kusto/ingestor.rb +++ b/lib/logstash/outputs/kusto/ingestor.rb @@ -129,7 +129,7 @@ def upload_async(path, delete_on_success) @workers_pool.post do LogStash::Util.set_thread_name("Kusto to ingest file: #{path}") - upload(path, delete_on_success) + uploadV2(path, delete_on_success) end rescue Exception => e @logger.error('StandardError.', exception: e.class, message: e.message, path: path, backtrace: e.backtrace) @@ -176,6 +176,37 @@ def upload(path, delete_on_success) retry end + def uploadV2(path, delete_on_success) + file_size = File.size(path) + @logger.info("Ingesting #{file_size} bytes to database: #{@ingestion_properties.getDatabaseName} table: #{@ingestion_properties.getTableName}") + if file_size > 0 + exceptions = Concurrent::Array.new + promise = Concurrent::Promises.future { + file_source_info = Java::com.microsoft.azure.kusto.ingest.source.FileSourceInfo.new(path) # 0 - let the sdk figure out the size of the file + ingest_result = @kusto_client.ingestFromFile(file_source_info, @ingestion_properties) + } + .rescue{ |e| + @logger.error("Ingestion failed: #{e.message}, File on path #{path} will not be deleted.") + @logger.error("Ingestion failed: #{e.backtrace.join("\n")}") + } + .on_resolution do |fulfilled, value, reason, *args| + @logger.debug("Future fulfilled: #{fulfilled}, value: #{value}, reason: #{reason}, args: #{args}, class: #{value.class}") + if value.class == Java::ComMicrosoftAzureKustoIngestResult::IngestionStatusResult + isc = value.getIngestionStatusCollection()&.get(0)&.getStatus() + @logger.info("Ingestion status: #{isc}") + File.delete(path) if delete_on_success + else + @logger.warn("Ingestion status is non success status: #{value.class} - #{value}, File on path #{path} will not be deleted.") + end + if exceptions.size > 0 + @logger.error("Ingestion failed with exceptions: #{exceptions.map(&:message).join(', ')}, File on path #{path} will not be deleted.") + end + end + else + @logger.warn("Data is empty and is not ingested.") + end # if data.size > 0 + end # def upload + def stop @workers_pool.shutdown @workers_pool.wait_for_termination(nil) # block until its done