Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
strategy:
matrix:
logstash: [
{ version: '8.7.0', url: "https://artifacts.elastic.co/downloads/logstash/logstash-8.7.0-linux-x86_64.tar.gz" , main: 'true' }
{ version: '9.1.0', url: "https://artifacts.elastic.co/downloads/logstash/logstash-9.1.0-linux-x86_64.tar.gz" , main: 'true' }
]
env:
LOGSTASH_SOURCE: 1
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ This connector forwards data to

## Requirements

- Logstash version 6+. [Installation instructions](https://www.elastic.co/guide/en/logstash/current/installing-logstash.html)
- The latest version is tested for Logstash version 9.1+ (uses JDK 21) [Installation instructions](https://www.elastic.co/guide/en/logstash/current/installing-logstash.html)
- Azure Data Explorer cluster with a database. Read [Create a cluster and database](https://docs.microsoft.com/en-us/azure/data-explorer/create-cluster-database-portal) for more information.
- AAD Application credentials with permission to ingest data into Azure Data Explorer. Read [Creating an AAD Application](https://docs.microsoft.com/en-us/azure/kusto/management/access-control/how-to-provision-aad-app) for more information.

Expand Down Expand Up @@ -81,6 +81,8 @@ export LS_JAVA_OPTS="-Dhttp.proxyHost=1.2.34 -Dhttp.proxyPort=8989 -Dhttps.prox

| Version | Release Date | Notes |
| --- | --- | --- |
| 2.1.1 | 2025-09-25 | - Fix #84 adding thread-id as well to the file path to avoid race conditions in multi-worker scenario |
| 2.1.0 | 2025-09-25 | - Bump SDK versions |
| 2.0.8 | 2024-10-23 | - Fix library deprecations, fix issues in the Azure Identity library |
| 2.0.7 | 2024-01-01 | - Update Kusto JAVA SDK |
| 2.0.3 | 2023-12-12 | - Make JSON mapping field optional. If not provided logstash output JSON attribute names will be used for column resolution |
Expand Down
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ dependencies {
implementation 'net.java.dev.jna:jna:5.13.0'
implementation 'net.minidev:accessors-smart:2.5.2'
implementation 'net.minidev:json-smart:2.5.2'
implementation 'org.apache.commons:commons-lang3:3.14.0'
implementation 'org.apache.commons:commons-text:1.11.0'
implementation 'org.apache.httpcomponents:httpclient:4.5.14'
implementation 'org.apache.httpcomponents:httpcore:4.4.16'
Expand Down
105 changes: 33 additions & 72 deletions lib/logstash/outputs/kusto.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
# If `false`, the plugin will disregard temp files found
config :recovery, validate: :boolean, default: true


# The Kusto endpoint for ingestion related communication. You can see it on the Azure Portal.
config :ingest_url, validate: :string, required: true

Expand Down Expand Up @@ -95,7 +94,6 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
# Mapping name - deprecated, use json_mapping
config :mapping, validate: :string, deprecated: true


# Determines if local files used for temporary storage will be deleted
# after upload is successful
config :delete_temp_files, validate: :boolean, default: true
Expand Down Expand Up @@ -150,12 +148,18 @@ def register
end
@failure_path = File.join(@file_root, @filename_failure)

executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
max_threads: upload_concurrent_count,
max_queue: upload_queue_size,
fallback_policy: :caller_runs)
executor = Concurrent::ThreadPoolExecutor.new(
min_threads: 1,
max_threads: upload_concurrent_count,
max_queue: upload_queue_size,
fallback_policy: :caller_runs
)

@ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cli_auth, database, table, final_mapping, delete_temp_files, proxy_host, proxy_port,proxy_protocol, @logger, executor)
@ingestor = Ingestor.new(
ingest_url, app_id, app_key, app_tenant, managed_identity, cli_auth,
database, table, final_mapping, delete_temp_files,
proxy_host, proxy_port, proxy_protocol, @logger, executor
)

# send existing files
recover_past_files if recovery
Expand All @@ -173,33 +177,32 @@ def register
end

private

def validate_path
if (root_directory =~ FIELD_REF) != nil
@logger.error('The starting part of the path should not be dynamic.', path: @path)
raise LogStash::ConfigurationError.new('The starting part of the path should not be dynamic.')
end

if !path_with_field_ref?
unless path_with_field_ref?
@logger.error('Path should include some time related fields to allow for file rotation.', path: @path)
raise LogStash::ConfigurationError.new('Path should include some time related fields to allow for file rotation.')
end
end

private
def root_directory
parts = @path.split(File::SEPARATOR).reject(&:empty?)
if Gem.win_platform?
# First part is the drive letter
parts[1]
else
parts.first
end
end

public

def multi_receive_encoded(events_and_encoded)
encoded_by_path = Hash.new { |h, k| h[k] = [] }

events_and_encoded.each do |event, encoded|
file_output_path = event_path(event)
encoded_by_path[file_output_path] << encoded
Expand All @@ -208,11 +211,9 @@ def multi_receive_encoded(events_and_encoded)
@io_mutex.synchronize do
encoded_by_path.each do |path, chunks|
fd = open(path)
# append to the file
chunks.each { |chunk| fd.write(chunk) }
fd.flush unless @flusher && @flusher.alive?
end

close_stale_files if @stale_cleanup_type == 'events'
end
end
Expand All @@ -222,29 +223,26 @@ def close
@cleaner.stop unless @cleaner.nil?
@io_mutex.synchronize do
@logger.debug('Close: closing files')

@files.each do |path, fd|
begin
fd.close
@logger.debug("Closed file #{path}", fd: fd)

kusto_send_file(path)
rescue Exception => e
@logger.error('Exception while flushing and closing files.', exception: e)
end
end
end

@ingestor.stop unless @ingestor.nil?
end

private

def inside_file_root?(log_path)
target_file = File.expand_path(log_path)
return target_file.start_with?("#{@file_root}/")
target_file.start_with?("#{@file_root}/")
end

private
def event_path(event)
file_output_path = generate_filepath(event)
if path_with_field_ref? && !inside_file_root?(file_output_path)
Expand All @@ -254,77 +252,60 @@ def event_path(event)
file_output_path = @failure_path
end
@logger.debug('Writing event to tmp file.', filename: file_output_path)

file_output_path
end

private
def generate_filepath(event)
event.sprintf(@path)
end

private
def path_with_field_ref?
path =~ FIELD_REF
end

private
def extract_file_root
parts = File.expand_path(path).split(File::SEPARATOR)
parts.take_while { |part| part !~ FIELD_REF }.join(File::SEPARATOR)
end

# the back-bone of @flusher, our periodic-flushing interval.
private
def flush_pending_files
@io_mutex.synchronize do
@logger.debug('Starting flush cycle')

@files.each do |path, fd|
@logger.debug('Flushing file', path: path, fd: fd)
fd.flush
end
end
rescue Exception => e
# squash exceptions caught while flushing after logging them
@logger.error('Exception flushing files', exception: e.message, backtrace: e.backtrace)
end

# every 10 seconds or so (triggered by events, but if there are no events there's no point closing files anyway)
private
def close_stale_files
now = Time.now
return unless now - @last_stale_cleanup_cycle >= @stale_cleanup_interval

@logger.debug('Starting stale files cleanup cycle', files: @files)
inactive_files = @files.select { |path, fd| not fd.active }
inactive_files = @files.select { |path, fd| !fd.active }
@logger.debug("#{inactive_files.count} stale files found", inactive_files: inactive_files)
inactive_files.each do |path, fd|
@logger.info("Closing file #{path}")
fd.close
@files.delete(path)

kusto_send_file(path)
end
# mark all files as inactive, a call to write will mark them as active again
@files.each { |path, fd| fd.active = false }
@last_stale_cleanup_cycle = now
end

private
def cached?(path)
@files.include?(path) && !@files[path].nil?
end

private
def deleted?(path)
!File.exist?(path)
end

private
def open(path)
return @files[path] if !deleted?(path) && cached?(path)

if deleted?(path)
if @create_if_deleted
@logger.debug('Required file does not exist, creating it.', path: path)
Expand All @@ -333,67 +314,47 @@ def open(path)
return @files[path] if cached?(path)
end
end

@logger.info('Opening file', path: path)

dir = File.dirname(path)
if !Dir.exist?(dir)
unless Dir.exist?(dir)
@logger.info('Creating directory', directory: dir)
if @dir_mode != -1
FileUtils.mkdir_p(dir, mode: @dir_mode)
else
FileUtils.mkdir_p(dir)
end
end

# work around a bug opening fifos (bug JRUBY-6280)
stat = begin
File.stat(path)
rescue
nil
end
fd = if stat && stat.ftype == 'fifo' && LogStash::Environment.jruby?
java.io.FileWriter.new(java.io.File.new(path))
elsif @file_mode != -1
File.new(path, 'a+', @file_mode)
else
File.new(path, 'a+')
end
# fd = if @file_mode != -1
# File.new(path, 'a+', @file_mode)
# else
# File.new(path, 'a+')
# end
# end
File.stat(path)
rescue
nil
end
fd = if stat && stat.ftype == 'fifo' && LogStash::Environment.jruby?
java.io.FileWriter.new(java.io.File.new(path))
elsif @file_mode != -1
File.new(path, 'a+', @file_mode)
else
File.new(path, 'a+')
end
@files[path] = IOWriter.new(fd)
end

private
def kusto_send_file(file_path)
@ingestor.upload_async(file_path, delete_temp_files)
end

private
def recover_past_files
require 'find'

# we need to find the last "regular" part in the path before any dynamic vars
path_last_char = @path.length - 1

pattern_start = @path.index('%') || path_last_char
last_folder_before_pattern = @path.rindex('/', pattern_start) || path_last_char
new_path = path[0..last_folder_before_pattern]

begin
return unless Dir.exist?(new_path)
@logger.info("Going to recover old files in path #{@new_path}")

@logger.info("Going to recover old files in path #{new_path}")
old_files = Find.find(new_path).select { |p| /.*\.#{database}\.#{table}$/ =~ p }
@logger.info("Found #{old_files.length} old file(s), sending them now...")

old_files.each do |file|
kusto_send_file(file)
end
old_files.each { |file| kusto_send_file(file) }
rescue Errno::ENOENT => e
@logger.warn('No such file or directory', exception: e.class, message: e.message, path: new_path, backtrace: e.backtrace)
end
Expand All @@ -402,6 +363,8 @@ def recover_past_files

# wrapper class
class IOWriter
attr_accessor :active

def initialize(io)
@io = io
end
Expand All @@ -417,11 +380,9 @@ def flush

def method_missing(method_name, *args, &block)
if @io.respond_to?(method_name)

@io.send(method_name, *args, &block)
else
super
end
end
attr_accessor :active
end
Loading