diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index a0e6ca3f..12fe128b 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -16,7 +16,7 @@ jobs: strategy: matrix: logstash: [ - { version: '8.7.0', url: "https://artifacts.elastic.co/downloads/logstash/logstash-8.7.0-linux-x86_64.tar.gz" , main: 'true' } + { version: '9.1.0', url: "https://artifacts.elastic.co/downloads/logstash/logstash-9.1.0-linux-x86_64.tar.gz" , main: 'true' } ] env: LOGSTASH_SOURCE: 1 diff --git a/README.md b/README.md index b256a266..edb3aa42 100755 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ This connector forwards data to ## Requirements -- Logstash version 6+. [Installation instructions](https://www.elastic.co/guide/en/logstash/current/installing-logstash.html) +- The latest version is tested for Logstash version 9.1+ (uses JDK 21) [Installation instructions](https://www.elastic.co/guide/en/logstash/current/installing-logstash.html) - Azure Data Explorer cluster with a database. Read [Create a cluster and database](https://docs.microsoft.com/en-us/azure/data-explorer/create-cluster-database-portal) for more information. - AAD Application credentials with permission to ingest data into Azure Data Explorer. Read [Creating an AAD Application](https://docs.microsoft.com/en-us/azure/kusto/management/access-control/how-to-provision-aad-app) for more information. @@ -81,6 +81,8 @@ export LS_JAVA_OPTS="-Dhttp.proxyHost=1.2.34 -Dhttp.proxyPort=8989 -Dhttps.prox | Version | Release Date | Notes | | --- | --- | --- | +| 2.1.1 | 2025-09-25 | - Fix #84 adding thread-id as well to the file path to avoid race conditions in multi-worker scenario | +| 2.1.0 | 2025-09-25 | - Bump SDK versions | | 2.0.8 | 2024-10-23 | - Fix library deprecations, fix issues in the Azure Identity library | | 2.0.7 | 2024-01-01 | - Update Kusto JAVA SDK | | 2.0.3 | 2023-12-12 | - Make JSON mapping field optional. If not provided logstash output JSON attribute names will be used for column resolution | diff --git a/build.gradle b/build.gradle index da8f5435..e8055f87 100644 --- a/build.gradle +++ b/build.gradle @@ -93,7 +93,6 @@ dependencies { implementation 'net.java.dev.jna:jna:5.13.0' implementation 'net.minidev:accessors-smart:2.5.2' implementation 'net.minidev:json-smart:2.5.2' - implementation 'org.apache.commons:commons-lang3:3.14.0' implementation 'org.apache.commons:commons-text:1.11.0' implementation 'org.apache.httpcomponents:httpclient:4.5.14' implementation 'org.apache.httpcomponents:httpcore:4.4.16' diff --git a/lib/logstash/outputs/kusto.rb b/lib/logstash/outputs/kusto.rb index cd61d5d2..251d64a1 100755 --- a/lib/logstash/outputs/kusto.rb +++ b/lib/logstash/outputs/kusto.rb @@ -67,7 +67,6 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base # If `false`, the plugin will disregard temp files found config :recovery, validate: :boolean, default: true - # The Kusto endpoint for ingestion related communication. You can see it on the Azure Portal. config :ingest_url, validate: :string, required: true @@ -95,7 +94,6 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base # Mapping name - deprecated, use json_mapping config :mapping, validate: :string, deprecated: true - # Determines if local files used for temporary storage will be deleted # after upload is successful config :delete_temp_files, validate: :boolean, default: true @@ -150,12 +148,18 @@ def register end @failure_path = File.join(@file_root, @filename_failure) - executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1, - max_threads: upload_concurrent_count, - max_queue: upload_queue_size, - fallback_policy: :caller_runs) + executor = Concurrent::ThreadPoolExecutor.new( + min_threads: 1, + max_threads: upload_concurrent_count, + max_queue: upload_queue_size, + fallback_policy: :caller_runs + ) - @ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cli_auth, database, table, final_mapping, delete_temp_files, proxy_host, proxy_port,proxy_protocol, @logger, executor) + @ingestor = Ingestor.new( + ingest_url, app_id, app_key, app_tenant, managed_identity, cli_auth, + database, table, final_mapping, delete_temp_files, + proxy_host, proxy_port, proxy_protocol, @logger, executor + ) # send existing files recover_past_files if recovery @@ -173,23 +177,22 @@ def register end private + def validate_path if (root_directory =~ FIELD_REF) != nil @logger.error('The starting part of the path should not be dynamic.', path: @path) raise LogStash::ConfigurationError.new('The starting part of the path should not be dynamic.') end - if !path_with_field_ref? + unless path_with_field_ref? @logger.error('Path should include some time related fields to allow for file rotation.', path: @path) raise LogStash::ConfigurationError.new('Path should include some time related fields to allow for file rotation.') end end - private def root_directory parts = @path.split(File::SEPARATOR).reject(&:empty?) if Gem.win_platform? - # First part is the drive letter parts[1] else parts.first @@ -197,9 +200,9 @@ def root_directory end public + def multi_receive_encoded(events_and_encoded) encoded_by_path = Hash.new { |h, k| h[k] = [] } - events_and_encoded.each do |event, encoded| file_output_path = event_path(event) encoded_by_path[file_output_path] << encoded @@ -208,11 +211,9 @@ def multi_receive_encoded(events_and_encoded) @io_mutex.synchronize do encoded_by_path.each do |path, chunks| fd = open(path) - # append to the file chunks.each { |chunk| fd.write(chunk) } fd.flush unless @flusher && @flusher.alive? end - close_stale_files if @stale_cleanup_type == 'events' end end @@ -222,29 +223,26 @@ def close @cleaner.stop unless @cleaner.nil? @io_mutex.synchronize do @logger.debug('Close: closing files') - @files.each do |path, fd| begin fd.close @logger.debug("Closed file #{path}", fd: fd) - kusto_send_file(path) rescue Exception => e @logger.error('Exception while flushing and closing files.', exception: e) end end end - @ingestor.stop unless @ingestor.nil? end private + def inside_file_root?(log_path) target_file = File.expand_path(log_path) - return target_file.start_with?("#{@file_root}/") + target_file.start_with?("#{@file_root}/") end - private def event_path(event) file_output_path = generate_filepath(event) if path_with_field_ref? && !inside_file_root?(file_output_path) @@ -254,77 +252,60 @@ def event_path(event) file_output_path = @failure_path end @logger.debug('Writing event to tmp file.', filename: file_output_path) - file_output_path end - private def generate_filepath(event) event.sprintf(@path) end - private def path_with_field_ref? path =~ FIELD_REF end - private def extract_file_root parts = File.expand_path(path).split(File::SEPARATOR) parts.take_while { |part| part !~ FIELD_REF }.join(File::SEPARATOR) end - # the back-bone of @flusher, our periodic-flushing interval. - private def flush_pending_files @io_mutex.synchronize do @logger.debug('Starting flush cycle') - @files.each do |path, fd| @logger.debug('Flushing file', path: path, fd: fd) fd.flush end end rescue Exception => e - # squash exceptions caught while flushing after logging them @logger.error('Exception flushing files', exception: e.message, backtrace: e.backtrace) end - # every 10 seconds or so (triggered by events, but if there are no events there's no point closing files anyway) - private def close_stale_files now = Time.now return unless now - @last_stale_cleanup_cycle >= @stale_cleanup_interval - @logger.debug('Starting stale files cleanup cycle', files: @files) - inactive_files = @files.select { |path, fd| not fd.active } + inactive_files = @files.select { |path, fd| !fd.active } @logger.debug("#{inactive_files.count} stale files found", inactive_files: inactive_files) inactive_files.each do |path, fd| @logger.info("Closing file #{path}") fd.close @files.delete(path) - kusto_send_file(path) end - # mark all files as inactive, a call to write will mark them as active again @files.each { |path, fd| fd.active = false } @last_stale_cleanup_cycle = now end - private def cached?(path) @files.include?(path) && !@files[path].nil? end - private def deleted?(path) !File.exist?(path) end - private def open(path) return @files[path] if !deleted?(path) && cached?(path) - if deleted?(path) if @create_if_deleted @logger.debug('Required file does not exist, creating it.', path: path) @@ -333,11 +314,9 @@ def open(path) return @files[path] if cached?(path) end end - @logger.info('Opening file', path: path) - dir = File.dirname(path) - if !Dir.exist?(dir) + unless Dir.exist?(dir) @logger.info('Creating directory', directory: dir) if @dir_mode != -1 FileUtils.mkdir_p(dir, mode: @dir_mode) @@ -345,55 +324,37 @@ def open(path) FileUtils.mkdir_p(dir) end end - - # work around a bug opening fifos (bug JRUBY-6280) stat = begin - File.stat(path) - rescue - nil - end - fd = if stat && stat.ftype == 'fifo' && LogStash::Environment.jruby? - java.io.FileWriter.new(java.io.File.new(path)) - elsif @file_mode != -1 - File.new(path, 'a+', @file_mode) - else - File.new(path, 'a+') - end - # fd = if @file_mode != -1 - # File.new(path, 'a+', @file_mode) - # else - # File.new(path, 'a+') - # end - # end + File.stat(path) + rescue + nil + end + fd = if stat && stat.ftype == 'fifo' && LogStash::Environment.jruby? + java.io.FileWriter.new(java.io.File.new(path)) + elsif @file_mode != -1 + File.new(path, 'a+', @file_mode) + else + File.new(path, 'a+') + end @files[path] = IOWriter.new(fd) end - private def kusto_send_file(file_path) @ingestor.upload_async(file_path, delete_temp_files) end - private def recover_past_files require 'find' - - # we need to find the last "regular" part in the path before any dynamic vars path_last_char = @path.length - 1 - pattern_start = @path.index('%') || path_last_char last_folder_before_pattern = @path.rindex('/', pattern_start) || path_last_char new_path = path[0..last_folder_before_pattern] - begin return unless Dir.exist?(new_path) - @logger.info("Going to recover old files in path #{@new_path}") - + @logger.info("Going to recover old files in path #{new_path}") old_files = Find.find(new_path).select { |p| /.*\.#{database}\.#{table}$/ =~ p } @logger.info("Found #{old_files.length} old file(s), sending them now...") - - old_files.each do |file| - kusto_send_file(file) - end + old_files.each { |file| kusto_send_file(file) } rescue Errno::ENOENT => e @logger.warn('No such file or directory', exception: e.class, message: e.message, path: new_path, backtrace: e.backtrace) end @@ -402,6 +363,8 @@ def recover_past_files # wrapper class class IOWriter + attr_accessor :active + def initialize(io) @io = io end @@ -417,11 +380,9 @@ def flush def method_missing(method_name, *args, &block) if @io.respond_to?(method_name) - @io.send(method_name, *args, &block) else super end end - attr_accessor :active end diff --git a/lib/logstash/outputs/kusto/ingestor.rb b/lib/logstash/outputs/kusto/ingestor.rb index f2e10601..d273c793 100755 --- a/lib/logstash/outputs/kusto/ingestor.rb +++ b/lib/logstash/outputs/kusto/ingestor.rb @@ -20,10 +20,10 @@ class Ingestor LOW_QUEUE_LENGTH = 3 FIELD_REF = /%\{[^}]+\}/ - def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli_auth, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,logger, threadpool = DEFAULT_THREADPOOL) + def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli_auth, database, table, json_mapping, delete_local, proxy_host, proxy_port, proxy_protocol, logger, threadpool = DEFAULT_THREADPOOL) @workers_pool = threadpool @logger = logger - validate_config(database, table, json_mapping,proxy_protocol,app_id, app_key, managed_identity_id,cli_auth) + validate_config(database, table, json_mapping, proxy_protocol, app_id, app_key, managed_identity_id, cli_auth) @logger.info('Preparing Kusto resources.') kusto_java = Java::com.microsoft.azure.kusto @@ -38,22 +38,22 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli is_direct_conn = (proxy_host.nil? || proxy_host.empty?) # Create a connection string kusto_connection_string = if is_managed_identity - if is_system_assigned_managed_identity - @logger.info('Using system managed identity.') - kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url) - else - @logger.info('Using user managed identity.') - kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url, managed_identity_id) - end + if is_system_assigned_managed_identity + @logger.info('Using system managed identity.') + kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url) else - if cli_auth - @logger.warn('*Use of CLI Auth is only for dev-test scenarios. This is ***NOT RECOMMENDED*** for production*') - kusto_java.data.auth.ConnectionStringBuilder.createWithAzureCli(ingest_url) - else - @logger.info('Using app id and app key.') - kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant) - end + @logger.info('Using user managed identity.') + kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url, managed_identity_id) end + else + if cli_auth + @logger.warn('*Use of CLI Auth is only for dev-test scenarios. This is ***NOT RECOMMENDED*** for production*') + kusto_java.data.auth.ConnectionStringBuilder.createWithAzureCli(ingest_url) + else + @logger.info('Using app id and app key.') + kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant) + end + end # @logger.debug(Gem.loaded_specs.to_s) # Unfortunately there's no way to avoid using the gem/plugin name directly... @@ -62,8 +62,8 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli java_util = Java::java.util # kusto_connection_string.setClientVersionForTracing(name_for_tracing) - version_for_tracing=Gem.loaded_specs['logstash-output-kusto']&.version || "unknown" - kusto_connection_string.setConnectorDetails("Logstash",version_for_tracing.to_s,"","",false,"", java_util.Collections.emptyMap()); + version_for_tracing = Gem.loaded_specs['logstash-output-kusto']&.version || "unknown" + kusto_connection_string.setConnectorDetails("Logstash", version_for_tracing.to_s, "", "", false, "", java_util.Collections.emptyMap()) @kusto_client = begin if is_direct_conn kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string) @@ -91,7 +91,7 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli @logger.debug('Kusto resources are ready.') end - def validate_config(database, table, json_mapping, proxy_protocol, app_id, app_key, managed_identity_id,cli_auth) + def validate_config(database, table, json_mapping, proxy_protocol, app_id, app_key, managed_identity_id, cli_auth) # Add an additional validation and fail this upfront if app_id.nil? && app_key.nil? && managed_identity_id.nil? if cli_auth @@ -100,7 +100,7 @@ def validate_config(database, table, json_mapping, proxy_protocol, app_id, app_k @logger.error('managed_identity_id is not provided and app_id/app_key is empty.') raise LogStash::ConfigurationError.new('managed_identity_id is not provided and app_id/app_key is empty.') end - end + end if database =~ FIELD_REF @logger.error('database config value should not be dynamic.', database) raise LogStash::ConfigurationError.new('database config value should not be dynamic.') @@ -116,11 +116,10 @@ def validate_config(database, table, json_mapping, proxy_protocol, app_id, app_k raise LogStash::ConfigurationError.new('json_mapping config value should not be dynamic.') end - if not(["https", "http"].include? proxy_protocol) + unless ["https", "http"].include? proxy_protocol @logger.error('proxy_protocol has to be http or https.', proxy_protocol) raise LogStash::ConfigurationError.new('proxy_protocol has to be http or https.') end - end def upload_async(path, delete_on_success) @@ -130,7 +129,7 @@ def upload_async(path, delete_on_success) @workers_pool.post do LogStash::Util.set_thread_name("Kusto to ingest file: #{path}") - upload(path, delete_on_success) + uploadV2(path, delete_on_success) end rescue Exception => e @logger.error('StandardError.', exception: e.class, message: e.message, path: path, backtrace: e.backtrace) @@ -156,13 +155,13 @@ def upload(path, delete_on_success) # end if file_size > 0 - file_source_info = Java::com.microsoft.azure.kusto.ingest.source.FileSourceInfo.new(path); # 0 - let the sdk figure out the size of the file + file_source_info = Java::com.microsoft.azure.kusto.ingest.source.FileSourceInfo.new(path) # 0 - let the sdk figure out the size of the file @kusto_client.ingestFromFile(file_source_info, @ingestion_properties) else @logger.warn("File #{path} is an empty file and is not ingested.") end File.delete(path) if delete_on_success - @logger.debug("File #{path} sent to kusto.") + @logger.info("File #{path} sent to kusto.") rescue Errno::ENOENT => e @logger.error("File doesn't exist! Unrecoverable error.", exception: e.class, message: e.message, path: path, backtrace: e.backtrace) rescue Java::JavaNioFile::NoSuchFileException => e @@ -177,6 +176,37 @@ def upload(path, delete_on_success) retry end + def uploadV2(path, delete_on_success) + file_size = File.size(path) + @logger.info("Ingesting #{file_size} bytes to database: #{@ingestion_properties.getDatabaseName} table: #{@ingestion_properties.getTableName}") + if file_size > 0 + exceptions = Concurrent::Array.new + promise = Concurrent::Promises.future { + file_source_info = Java::com.microsoft.azure.kusto.ingest.source.FileSourceInfo.new(path) # 0 - let the sdk figure out the size of the file + ingest_result = @kusto_client.ingestFromFile(file_source_info, @ingestion_properties) + } + .rescue{ |e| + @logger.error("Ingestion failed: #{e.message}, File on path #{path} will not be deleted.") + @logger.error("Ingestion failed: #{e.backtrace.join("\n")}") + } + .on_resolution do |fulfilled, value, reason, *args| + @logger.debug("Future fulfilled: #{fulfilled}, value: #{value}, reason: #{reason}, args: #{args}, class: #{value.class}") + if value.class == Java::ComMicrosoftAzureKustoIngestResult::IngestionStatusResult + isc = value.getIngestionStatusCollection()&.get(0)&.getStatus() + @logger.info("Ingestion status: #{isc}") + File.delete(path) if delete_on_success + else + @logger.warn("Ingestion status is non success status: #{value.class} - #{value}, File on path #{path} will not be deleted.") + end + if exceptions.size > 0 + @logger.error("Ingestion failed with exceptions: #{exceptions.map(&:message).join(', ')}, File on path #{path} will not be deleted.") + end + end + else + @logger.warn("Data is empty and is not ingested.") + end # if data.size > 0 + end # def upload + def stop @workers_pool.shutdown @workers_pool.wait_for_termination(nil) # block until its done diff --git a/random-data-json.sh b/random-data-json.sh new file mode 100755 index 00000000..8c68a4b2 --- /dev/null +++ b/random-data-json.sh @@ -0,0 +1,28 @@ +#!/usr/bin/env bash +# you will need to install jq for JSON handling + +while true +do + # Generate random IP + random_ip=$(dd if=/dev/urandom bs=4 count=1 2>/dev/null | od -An -tu1 | sed -e 's/^ *//' -e 's/ */./g') + + # Generate random response size and HTTP status + random_size=$(( (RANDOM % 65535) + 1 )) + status_codes=(200 201 400 404 500) + random_status=${status_codes[$RANDOM % ${#status_codes[@]}]} + + # Generate current timestamp in ISO 8601 + timestamp=$(date -u +"%Y-%m-%dT%H:%M:%SZ") + + # Random endpoint + endpoints=("/api/data" "/user/login" "/metrics" "/products/123" "/health") + random_endpoint=${endpoints[$RANDOM % ${#endpoints[@]}]} + + # Construct JSON log + json_log=$(jq -c -n --arg ip "$random_ip" --arg ts "$timestamp" --arg endpoint "$random_endpoint" --arg status "$random_status" --arg size "$random_size" '{ip: $ip, timestamp: $ts, endpoint: $endpoint, status: ($status|tonumber), size: ($size|tonumber)}') + + + echo "$json_log" | tee -a '/tmp/jsonlogs.txt' + + sleep 0.1 +done diff --git a/random-data.sh b/random-data.sh new file mode 100755 index 00000000..9c7f6153 --- /dev/null +++ b/random-data.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash +while true +do + random_ip=$(dd if=/dev/urandom bs=4 count=1 2>/dev/null | od -An -tu1 | sed -e 's/^ *//' -e 's/ */./g') + random_size=$(( (RANDOM % 65535) + 1 )) + current_date_time=$(date '+%d/%b/%Y:%H:%M:%S %z') + echo "$random_ip - - [$current_date_time] \"GET /data.php HTTP/1.1\" 200 $random_size" | tee -a '/tmp/curllogs.txt' + sleep 0.001s +done \ No newline at end of file diff --git a/version b/version index 50aea0e7..7c327287 100644 --- a/version +++ b/version @@ -1 +1 @@ -2.1.0 \ No newline at end of file +2.1.1 \ No newline at end of file