Skip to content

Commit 5941adf

Browse files
* Refactor to classes
1 parent 64b4d34 commit 5941adf

File tree

8 files changed

+444
-242
lines changed

8 files changed

+444
-242
lines changed

lib/logstash/outputs/kusto.rb

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
require 'logstash/outputs/kusto/ingestor'
88
require 'logstash/outputs/kusto/interval'
99
require 'logstash/outputs/kusto/custom_size_based_buffer'
10+
require 'logstash/outputs/kusto/kustoLogstashConfiguration'
1011

1112
##
1213
# This plugin sends messages to Azure Kusto in batches.
@@ -91,13 +92,13 @@ def register
9192
max_threads: upload_concurrent_count,
9293
max_queue: upload_queue_size,
9394
fallback_policy: :caller_runs)
94-
95-
@ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cli_auth, database, table, final_mapping, proxy_host, proxy_port, proxy_protocol, @logger, executor)
96-
97-
# Deprecation warning for path
98-
if @path
99-
@logger.warn("The 'path' configuration option is deprecated and will be removed in a future release.")
100-
end
95+
96+
kusto_ingest_base = LogStash::Outputs::KustoInternal::KustoIngestConfiguration.new(ingest_url, database, table, final_mapping)
97+
kusto_auth_base = LogStash::Outputs::KustoInternal::KustoAuthConfiguration.new(app_id, app_key, app_tenant, managed_identity, cli_auth)
98+
kusto_proxy_base = LogStash::Outputs::KustoInternal::KustoProxyConfiguration.new(proxy_host , proxy_port , proxy_protocol, false)
99+
@kusto_logstash_configuration = LogStash::Outputs::KustoInternal::KustoLogstashConfiguration.new(kusto_ingest_base, kusto_auth_base , kusto_proxy_base, logger)
100+
@ingestor = Ingestor.new(@kusto_logstash_configuration, @logger, executor)
101+
101102
end
102103

103104

@@ -114,8 +115,7 @@ def multi_receive_encoded(events_and_encoded)
114115

115116
def close
116117
@logger.info("Closing Kusto output plugin")
117-
118-
begin
118+
begin
119119
@buffer.shutdown unless @buffer.nil?
120120
@logger.info("Buffer shutdown") unless @buffer.nil?
121121
rescue => e

lib/logstash/outputs/kusto/custom_size_based_buffer.rb

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,10 @@ def initialize(max_size_mb, max_interval, &flush_callback)
3939
end
4040
end
4141

42-
def <<(event)
43-
while buffer_full? do
44-
sleep 0.1
45-
end
42+
def <<(event)
43+
while buffer_full? do
44+
sleep 0.1
45+
end
4646

4747
@pending_mutex.synchronize do
4848
@buffer_state[:pending_items] << event
@@ -58,23 +58,23 @@ def shutdown
5858
clear_buffer_files
5959
end
6060

61-
private
61+
private
6262

63-
def buffer_full?
64-
@pending_mutex.synchronize do
65-
@buffer_state[:pending_size] >= @buffer_config[:max_size]
66-
end
67-
end
63+
def buffer_full?
64+
@pending_mutex.synchronize do
65+
@buffer_state[:pending_size] >= @buffer_config[:max_size]
66+
end
67+
end
6868

69-
def buffer_flush(options = {})
70-
force = options[:force] || options[:final]
71-
final = options[:final]
69+
def buffer_flush(options = {})
70+
force = options[:force] || options[:final]
71+
final = options[:final]
7272

73-
if final
74-
@flush_mutex.lock
75-
elsif !@flush_mutex.try_lock
76-
return 0
77-
end
73+
if final
74+
@flush_mutex.lock
75+
elsif !@flush_mutex.try_lock
76+
return 0
77+
end
7878

7979
items_flushed = 0
8080

@@ -85,7 +85,7 @@ def buffer_flush(options = {})
8585
@pending_mutex.synchronize do
8686
return 0 if @buffer_state[:pending_size] == 0
8787

88-
time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush]
88+
time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush]
8989

9090
if !force && @buffer_state[:pending_size] < @buffer_config[:max_size] && time_since_last_flush < @buffer_config[:max_interval]
9191
return 0
@@ -123,8 +123,8 @@ def buffer_flush(options = {})
123123
@flush_mutex.unlock
124124
end
125125

126-
items_flushed
127-
end
126+
items_flushed
127+
end
128128

129129
def save_buffer_to_file(events)
130130
buffer_state_copy = {

lib/logstash/outputs/kusto/ingestor.rb

Lines changed: 21 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -20,38 +20,39 @@ class Ingestor
2020
LOW_QUEUE_LENGTH = 3
2121
FIELD_REF = /%\{[^}]+\}/
2222

23-
def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli_auth, database, table, json_mapping, proxy_host , proxy_port , proxy_protocol,logger, threadpool = DEFAULT_THREADPOOL)
23+
def initialize(kusto_logstash_configuration, logger, threadpool = DEFAULT_THREADPOOL)
2424
@workers_pool = threadpool
2525
@logger = logger
26-
validate_config(database, table, json_mapping,proxy_protocol,app_id, app_key, managed_identity_id,cli_auth)
26+
#Validate and assign
27+
kusto_logstash_configuration.validate_config()
28+
@kusto_logstash_configuration = kusto_logstash_configuration
29+
2730
@logger.info('Preparing Kusto resources.')
2831

2932
kusto_java = Java::com.microsoft.azure.kusto
3033
apache_http = Java::org.apache.http
31-
# kusto_connection_string = kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant)
32-
# If there is managed identity, use it. This means the AppId and AppKey are empty/nil
33-
# If there is CLI Auth, use that instead of managed identity
34-
is_managed_identity = (app_id.nil? && app_key.nil? && !cli_auth)
34+
35+
is_managed_identity = @kusto_logstash_configuration.kusto_auth.is_managed_identity
3536
# If it is system managed identity, propagate the system identity
36-
is_system_assigned_managed_identity = is_managed_identity && 0 == "system".casecmp(managed_identity_id)
37+
is_system_assigned_managed_identity = @kusto_logstash_configuration.kusto_auth.is_system_assigned_managed_identity
3738
# Is it direct connection
38-
is_direct_conn = (proxy_host.nil? || proxy_host.empty?)
39+
is_direct_conn = @kusto_logstash_configuration.kusto_proxy.is_direct_conn
3940
# Create a connection string
4041
kusto_connection_string = if is_managed_identity
4142
if is_system_assigned_managed_identity
4243
@logger.info('Using system managed identity.')
43-
kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url)
44+
kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(@kusto_logstash_configuration.kusto_ingest.ingest_url)
4445
else
4546
@logger.info('Using user managed identity.')
46-
kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url, managed_identity_id)
47+
kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(@kusto_logstash_configuration.kusto_ingest.ingest_url, @kusto_logstash_configuration.kusto_ingest.managed_identity_id)
4748
end
4849
else
49-
if cli_auth
50+
if @kusto_logstash_configuration.kusto_auth.cli_auth
5051
@logger.warn('*Use of CLI Auth is only for dev-test scenarios. This is ***NOT RECOMMENDED*** for production*')
51-
kusto_java.data.auth.ConnectionStringBuilder.createWithAzureCli(ingest_url)
52+
kusto_java.data.auth.ConnectionStringBuilder.createWithAzureCli(@kusto_logstash_configuration.kusto_ingest.ingest_url)
5253
else
5354
@logger.info('Using app id and app key.')
54-
kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant)
55+
kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(@kusto_logstash_configuration.kusto_ingest.ingest_url, @kusto_logstash_configuration.kusto_auth.app_id, @kusto_logstash_configuration.kusto_auth.app_key.value, @kusto_logstash_configuration.kusto_auth.app_tenant)
5556
end
5657
end
5758
@logger.debug(Gem.loaded_specs.to_s)
@@ -62,22 +63,22 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli
6263
tuple_utils = Java::org.apache.commons.lang3.tuple
6364
# kusto_connection_string.setClientVersionForTracing(name_for_tracing)
6465
version_for_tracing=Gem.loaded_specs['logstash-output-kusto']&.version || "unknown"
65-
kusto_connection_string.setConnectorDetails("Logstash",version_for_tracing.to_s,"","",false,"", tuple_utils.Pair.emptyArray());
66+
kusto_connection_string.setConnectorDetails("Logstash",version_for_tracing.to_s,name_for_tracing.to_s,version_for_tracing.to_s,false,"", tuple_utils.Pair.emptyArray());
6667

6768
@kusto_client = begin
6869
if is_direct_conn
6970
kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string)
7071
else
71-
kusto_http_client_properties = kusto_java.data.HttpClientProperties.builder().proxy(apache_http.HttpHost.new(proxy_host,proxy_port,proxy_protocol)).build()
72+
kusto_http_client_properties = kusto_java.data.HttpClientProperties.builder().proxy(apache_http.HttpHost.new(@kusto_logstash_configuration.kusto_proxy.proxy_host,@kusto_logstash_configuration.kusto_proxy.proxy_port,@kusto_logstash_configuration.kusto_proxy.proxy_protocol)).build()
7273
kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string, kusto_http_client_properties)
7374
end
7475
end
7576

76-
@ingestion_properties = kusto_java.ingest.IngestionProperties.new(database, table)
77-
is_mapping_ref_provided = !(json_mapping.nil? || json_mapping.empty?)
78-
if is_mapping_ref_provided
79-
@logger.debug('Using mapping reference.', json_mapping)
80-
@ingestion_properties.setIngestionMapping(json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON)
77+
@ingestion_properties = kusto_java.ingest.IngestionProperties.new(@kusto_logstash_configuration.kusto_ingest.database, @kusto_logstash_configuration.kusto_ingest.table)
78+
79+
if @kusto_logstash_configuration.kusto_ingest.is_mapping_ref_provided
80+
@logger.debug('Using mapping reference.', @kusto_logstash_configuration.kusto_ingest.json_mapping)
81+
@ingestion_properties.setIngestionMapping(@kusto_logstash_configuration.kusto_ingest.json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON)
8182
@ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON)
8283
else
8384
@logger.debug('No mapping reference provided. Columns will be mapped by names in the logstash output')
@@ -86,38 +87,6 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli
8687
@logger.debug('Kusto resources are ready.')
8788
end
8889

89-
def validate_config(database, table, json_mapping, proxy_protocol, app_id, app_key, managed_identity_id,cli_auth)
90-
# Add an additional validation and fail this upfront
91-
if app_id.nil? && app_key.nil? && managed_identity_id.nil?
92-
if cli_auth
93-
@logger.info('Using CLI Auth, this is only for dev-test scenarios. This is ***NOT RECOMMENDED*** for production')
94-
else
95-
@logger.error('managed_identity_id is not provided and app_id/app_key is empty.')
96-
raise LogStash::ConfigurationError.new('managed_identity_id is not provided and app_id/app_key is empty.')
97-
end
98-
end
99-
if database =~ FIELD_REF
100-
@logger.error('database config value should not be dynamic.', database)
101-
raise LogStash::ConfigurationError.new('database config value should not be dynamic.')
102-
end
103-
104-
if table =~ FIELD_REF
105-
@logger.error('table config value should not be dynamic.', table)
106-
raise LogStash::ConfigurationError.new('table config value should not be dynamic.')
107-
end
108-
109-
if json_mapping =~ FIELD_REF
110-
@logger.error('json_mapping config value should not be dynamic.', json_mapping)
111-
raise LogStash::ConfigurationError.new('json_mapping config value should not be dynamic.')
112-
end
113-
114-
if not(["https", "http"].include? proxy_protocol)
115-
@logger.error('proxy_protocol has to be http or https.', proxy_protocol)
116-
raise LogStash::ConfigurationError.new('proxy_protocol has to be http or https.')
117-
end
118-
119-
end
120-
12190
def upload_async(data)
12291
if @workers_pool.remaining_capacity <= LOW_QUEUE_LENGTH
12392
@logger.warn("Ingestor queue capacity is running low with #{@workers_pool.remaining_capacity} free slots.")

lib/logstash/outputs/kusto/interval.rb

Lines changed: 63 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -5,77 +5,77 @@
55
require 'logstash/errors'
66

77
class LogStash::Outputs::Kusto < LogStash::Outputs::Base
8-
##
9-
# Bare-bones utility for running a block of code at an interval.
10-
#
11-
class Interval
12-
##
13-
# Initializes a new Interval with the given arguments and starts it
14-
# before returning it.
15-
#
16-
# @param interval [Integer] (see: Interval#initialize)
17-
# @param procsy [#call] (see: Interval#initialize)
18-
#
19-
# @return [Interval]
20-
#
21-
def self.start(interval, procsy)
22-
new(interval, procsy).tap(&:start)
23-
end
8+
##
9+
# Bare-bones utility for running a block of code at an interval.
10+
#
11+
class Interval
12+
##
13+
# Initializes a new Interval with the given arguments and starts it
14+
# before returning it.
15+
#
16+
# @param interval [Integer] (see: Interval#initialize)
17+
# @param procsy [#call] (see: Interval#initialize)
18+
#
19+
# @return [Interval]
20+
#
21+
def self.start(interval, procsy)
22+
new(interval, procsy).tap(&:start)
23+
end
2424

25-
##
26-
# @param interval [Integer]: time in seconds to wait between calling the given proc
27-
# @param procsy [#call]: proc or lambda to call periodically; must not raise exceptions.
28-
def initialize(interval, procsy)
29-
@interval = interval
30-
@procsy = procsy
25+
##
26+
# @param interval [Integer]: time in seconds to wait between calling the given proc
27+
# @param procsy [#call]: proc or lambda to call periodically; must not raise exceptions.
28+
def initialize(interval, procsy)
29+
@interval = interval
30+
@procsy = procsy
3131

32-
# Mutex, ConditionVariable, etc.
33-
@mutex = Mutex.new
34-
@sleeper = ConditionVariable.new
35-
end
32+
# Mutex, ConditionVariable, etc.
33+
@mutex = Mutex.new
34+
@sleeper = ConditionVariable.new
35+
end
3636

37-
##
38-
# Starts the interval, or returns if it has already been started.
39-
#
40-
# @return [void]
41-
def start
42-
@mutex.synchronize do
43-
return if @thread && @thread.alive?
37+
##
38+
# Starts the interval, or returns if it has already been started.
39+
#
40+
# @return [void]
41+
def start
42+
@mutex.synchronize do
43+
return if @thread && @thread.alive?
4444

45-
@thread = Thread.new { run }
46-
end
47-
end
45+
@thread = Thread.new { run }
46+
end
47+
end
4848

49-
##
50-
# Stop the interval.
51-
# Does not interrupt if execution is in-progress.
52-
def stop
53-
@mutex.synchronize do
54-
@stopped = true
55-
end
49+
##
50+
# Stop the interval.
51+
# Does not interrupt if execution is in-progress.
52+
def stop
53+
@mutex.synchronize do
54+
@stopped = true
55+
end
5656

57-
@thread && @thread.join
58-
end
57+
@thread && @thread.join
58+
end
5959

60-
##
61-
# @return [Boolean]
62-
def alive?
63-
@thread && @thread.alive?
64-
end
60+
##
61+
# @return [Boolean]
62+
def alive?
63+
@thread && @thread.alive?
64+
end
6565

66-
private
66+
private
6767

68-
def run
69-
@mutex.synchronize do
70-
loop do
71-
@sleeper.wait(@mutex, @interval)
72-
break if @stopped
68+
def run
69+
@mutex.synchronize do
70+
loop do
71+
@sleeper.wait(@mutex, @interval)
72+
break if @stopped
7373

74-
@procsy.call
75-
end
76-
end
77-
ensure
78-
@sleeper.broadcast
79-
end
80-
end
74+
@procsy.call
75+
end
76+
end
77+
ensure
78+
@sleeper.broadcast
79+
end
80+
end
8181
end

0 commit comments

Comments
 (0)