From b51011216dbfbd7c7120f4c95148db176ca1b5bf Mon Sep 17 00:00:00 2001 From: Ramachandran A G Date: Wed, 13 Dec 2023 21:11:24 +0530 Subject: [PATCH 1/7] * Start work on refactor --- .gitignore | 1 + lib/logstash/outputs/kusto.rb | 6 +- lib/logstash/outputs/kusto/ingestor.rb | 93 +++++------- .../outputs/kusto/kustoAadProvider.rb | 97 +++++++++++++ .../kusto/kustoLogstashConfiguration.rb | 132 ++++++++++++++++++ spec/outputs/kusto/ingestor_spec.rb | 131 ----------------- .../kusto/kustoLogstashConfiguration_spec.rb | 95 +++++++++++++ 7 files changed, 363 insertions(+), 192 deletions(-) create mode 100644 lib/logstash/outputs/kusto/kustoAadProvider.rb create mode 100644 lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb delete mode 100755 spec/outputs/kusto/ingestor_spec.rb create mode 100755 spec/outputs/kusto/kustoLogstashConfiguration_spec.rb diff --git a/.gitignore b/.gitignore index cd3e8864..fc91a8ed 100644 --- a/.gitignore +++ b/.gitignore @@ -60,3 +60,4 @@ gradle/wrapper/gradle-wrapper.properties rspec.xml e2e/output_file.txt logs.txt +local-run.sh diff --git a/lib/logstash/outputs/kusto.rb b/lib/logstash/outputs/kusto.rb index dacb442b..66962ae9 100755 --- a/lib/logstash/outputs/kusto.rb +++ b/lib/logstash/outputs/kusto.rb @@ -118,6 +118,9 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base # Check Proxy URL can be over http or https. Dowe need it this way or ignore this & remove this config :proxy_protocol, validate: :string, required: false , default: 'http' + # Use proxy for AAD only. If true, the plugin will use the proxy only for AAD authentication and will not use it for the actual data transfer. + config :proxy_aad_only, validate: :boolean, required: false , default: false + default :codec, 'json_lines' def register @@ -154,7 +157,8 @@ def register max_queue: upload_queue_size, fallback_policy: :caller_runs) - @ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, managed_identity, database, table, final_mapping, delete_temp_files, proxy_host, proxy_port,proxy_protocol, @logger, executor) + @ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, managed_identity, database, table, + final_mapping, delete_temp_files, proxy_host, proxy_port, proxy_protocol, proxy_aad_only, @logger, executor) # send existing files recover_past_files if recovery diff --git a/lib/logstash/outputs/kusto/ingestor.rb b/lib/logstash/outputs/kusto/ingestor.rb index 3a0b4a5a..986a383f 100755 --- a/lib/logstash/outputs/kusto/ingestor.rb +++ b/lib/logstash/outputs/kusto/ingestor.rb @@ -20,30 +20,41 @@ class Ingestor LOW_QUEUE_LENGTH = 3 FIELD_REF = /%\{[^}]+\}/ - def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,logger, threadpool = DEFAULT_THREADPOOL) + def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,proxy_aad_only, logger, threadpool = DEFAULT_THREADPOOL) @workers_pool = threadpool @logger = logger - validate_config(database, table, json_mapping,proxy_protocol,app_id, app_key, managed_identity_id) + @kustoLogstashConfiguration = LogStash::Outputs::KustoInternal::KustoLogstashConfiguration.new(ingest_url, app_id, app_key, app_tenant, managed_identity_id, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,proxy_aad_only, logger) + @kustoLogstashConfiguration.validate_config() @logger.info('Preparing Kusto resources.') + @ingestion_properties = get_ingestion_properties() + if @kustoLogstashConfiguration.proxy_aad_only + @kustoAadTokenProvider = LogStash::Outputs::KustoInternal::KustoAadTokenProvider.new(@kustoLogstashConfiguration) + end + @delete_local = delete_local + @logger.debug('Kusto resources are ready.') + end + + def get_kusto_client() + if @kusto_client.nil? + kusto_client = create_kusto_client() + end + return kusto_client + end + def create_kusto_client() kusto_java = Java::com.microsoft.azure.kusto apache_http = Java::org.apache.http - # kusto_connection_string = kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant) - # If there is managed identity, use it. This means the AppId and AppKey are empty/nil - is_managed_identity = (app_id.nil? && app_key.nil?) - # If it is system managed identity, propagate the system identity - is_system_assigned_managed_identity = is_managed_identity && 0 == "system".casecmp(managed_identity_id) - # Is it direct connection - is_direct_conn = (proxy_host.nil? || proxy_host.empty?) # Create a connection string - kusto_connection_string = if is_managed_identity - if is_system_assigned_managed_identity + kusto_connection_string = if @kustoLogstashConfiguration.is_managed_identity + if @kustoLogstashConfiguration.is_system_assigned_managed_identity @logger.info('Using system managed identity.') kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url) else @logger.info('Using user managed identity.') kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url, managed_identity_id) end + elsif @kustoLogstashConfiguration.proxy_aad_only + kusto_java.data.auth.ConnectionStringBuilder.createWithAccessToken(ingest_url, ) else kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant) end @@ -67,45 +78,22 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, dat end end - @ingestion_properties = kusto_java.ingest.IngestionProperties.new(database, table) - is_mapping_ref_provided = !(json_mapping.nil? || json_mapping.empty?) - if is_mapping_ref_provided - @logger.debug('Using mapping reference.', json_mapping) - @ingestion_properties.setIngestionMapping(json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON) - @ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON) - else - @logger.debug('No mapping reference provided. Columns will be mapped by names in the logstash output') - end - @delete_local = delete_local - @logger.debug('Kusto resources are ready.') - end - def validate_config(database, table, json_mapping, proxy_protocol, app_id, app_key, managed_identity_id) - # Add an additional validation and fail this upfront - if app_id.nil? && app_key.nil? && managed_identity_id.nil? - @logger.error('managed_identity_id is not provided and app_id/app_key is empty.') - raise LogStash::ConfigurationError.new('managed_identity_id is not provided and app_id/app_key is empty.') - end - if database =~ FIELD_REF - @logger.error('database config value should not be dynamic.', database) - raise LogStash::ConfigurationError.new('database config value should not be dynamic.') - end - if table =~ FIELD_REF - @logger.error('table config value should not be dynamic.', table) - raise LogStash::ConfigurationError.new('table config value should not be dynamic.') - end - if json_mapping =~ FIELD_REF - @logger.error('json_mapping config value should not be dynamic.', json_mapping) - raise LogStash::ConfigurationError.new('json_mapping config value should not be dynamic.') - end - if not(["https", "http"].include? proxy_protocol) - @logger.error('proxy_protocol has to be http or https.', proxy_protocol) - raise LogStash::ConfigurationError.new('proxy_protocol has to be http or https.') - end + end + def get_ingestion_properties() + ingestion_properties = kusto_java.ingest.IngestionProperties.new(@kustoLogstashConfiguration.database, @kustoLogstashConfiguration.table) + if @kustoLogstashConfiguration.is_mapping_ref_provided + @logger.debug('Using mapping reference.', json_mapping) + ingestion_properties.setIngestionMapping(json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON) + ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON) + else + @logger.debug('No mapping reference provided. Columns will be mapped by names in the logstash output') + end + return ingestion_properties end def upload_async(path, delete_on_success) @@ -125,21 +113,6 @@ def upload_async(path, delete_on_success) def upload(path, delete_on_success) file_size = File.size(path) @logger.debug("Sending file to kusto: #{path}. size: #{file_size}") - - # TODO: dynamic routing - # file_metadata = path.partition('.kusto.').last - # file_metadata_parts = file_metadata.split('.') - - # if file_metadata_parts.length == 3 - # # this is the number we expect - database, table, json_mapping - # database = file_metadata_parts[0] - # table = file_metadata_parts[1] - # json_mapping = file_metadata_parts[2] - - # local_ingestion_properties = Java::KustoIngestionProperties.new(database, table) - # local_ingestion_properties.addJsonMappingName(json_mapping) - # end - if file_size > 0 file_source_info = Java::com.microsoft.azure.kusto.ingest.source.FileSourceInfo.new(path, 0); # 0 - let the sdk figure out the size of the file @kusto_client.ingestFromFile(file_source_info, @ingestion_properties) diff --git a/lib/logstash/outputs/kusto/kustoAadProvider.rb b/lib/logstash/outputs/kusto/kustoAadProvider.rb new file mode 100644 index 00000000..5f11f1bc --- /dev/null +++ b/lib/logstash/outputs/kusto/kustoAadProvider.rb @@ -0,0 +1,97 @@ +# encoding: utf - 8 +require "logstash/kusto/kustoLogstashConfiguration" +require 'rest-client' +require 'json' +require 'openssl' +require 'base64' +require 'time' + +module LogStash + module Outputs + module KustoInternal + class KustoAadTokenProvider + def initialize(kustoLogstashConfiguration) + @kustoLogstashConfiguration = kustoLogstashConfiguration + # Perform the auth initialization + scope = CGI.escape(sprintf("%s/.default",kustoLogstashConfiguration.ingest_url)) + @logger = logger + @aad_uri = "https://login.microsoftonline.com" + @token_request_body = sprintf("client_id=%s&scope=%s&client_secret=%s&grant_type=client_credentials", kustoLogstashConfiguration.app_id, scope, kustoLogstashConfiguration.app_key) + @token_request_uri = sprintf("%s/%s/oauth2/v2.0/token", aad_uri, kustoLogstashConfiguration.app_tenant) + @token_state = { + : access_token => nil, + : expiry_time => nil, + : token_details_mutex => Mutex.new, + } + end # def initialize + + # Public methods + public + + def get_aad_token_bearer() + @token_state[: token_details_mutex].synchronize + do + if is_saved_token_need_refresh() + refresh_saved_token() + end + return @token_state[: access_token] + end + end # def get_aad_token_bearer + + # Private methods + private + + def is_saved_token_need_refresh() + return @token_state[: access_token].nil ? || @token_state[: expiry_time].nil ? || @token_state[: expiry_time] <= Time.now + end # def is_saved_token_need_refresh + + def refresh_saved_token() + @logger.info("aad token expired - refreshing token.") + token_response = post_token_request() + @token_state[: access_token] = token_response["access_token"] + @token_state[: expiry_time] = get_token_expiry_time(token_response["expires_in"]) + end # def refresh_saved_token + + def get_token_expiry_time(expires_in_seconds) + if (expires_in_seconds.nil ? || expires_in_seconds <= 0) + return Time.now + (60 * 60 * 24) # Refresh anyway in 24 hours + else + return Time.now + expires_in_seconds - 1; + # Decrease by 1 second to be on the safe side + end + end # def get_token_expiry_time + + # Post the given json to Azure Loganalytics + def post_token_request() + # Create REST request header + headers = get_header() + while true + begin + proxy_aad = sprintf("%s://%s:%s", @kustoLogstashConfiguration.proxy_protocol, @kustoLogstashConfiguration.proxy_host, @kustoLogstashConfiguration.proxy_port) + # Post REST request + response = RestClient::Request.execute(method:: post, url: @token_request_uri, payload: @token_request_body, headers: headers, + proxy: proxy_aad) + + if (response.code == 200 || response.code == 201) + return JSON.parse(response.body) + end + rescue RestClient::ExceptionWithResponse => ewr + @logger.error("Exception while authenticating with AAD API ['#{ewr.response}']") + rescue Exception => ex + @logger.trace("Exception while authenticating with AAD API ['#{ex}']") + end + @logger.error("Error while authenticating with AAD ('#{@aad_uri}'), retrying in 10 seconds.") + sleep 10 + end + end # def post_token_request + # Create a header + def get_header() + return { + 'Content-Type' => 'application/x-www-form-urlencoded', + } + end # def get_header + end # class KustoAadTokenProvider + end # module Kusto + end # module Outputs +end # module LogStash + diff --git a/lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb b/lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb new file mode 100644 index 00000000..4fb8dec4 --- /dev/null +++ b/lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb @@ -0,0 +1,132 @@ +# encoding: utf-8 +# A class just having all the configurations wrapped into a seperate object + +module LogStash + module Outputs + module KustoInternal + class KustoLogstashConfiguration + FIELD_REF = /%\{[^}]+\}/ + def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol, proxy_aad_only, logger) + @logger = logger + # For ingestion + @ingest_url = ingest_url + @database = database + @table = table + @json_mapping = json_mapping + @is_mapping_ref_provided = !(json_mapping.nil? || json_mapping.empty?) + # Authentication configuration + @app_id = app_id + @app_key = app_key + @app_tenant = app_tenant + @managed_identity_id = managed_identity_id + # proxy configuration + @proxy_host = proxy_host + @proxy_port = proxy_port + @proxy_protocol = proxy_protocol + @proxy_aad_only = proxy_aad_only + # Fields that are derived from the configuration + @is_managed_identity = app_id.to_s.empty? && app_key.to_s.empty? + # If it is system managed identity, propagate the system identity + @is_system_assigned_managed_identity = is_managed_identity && 0 == "system".casecmp(managed_identity_id) + # Is it direct connection + @is_direct_conn = (proxy_host.nil? || proxy_host.empty?) + @logger.info("Kusto configuration initialized.") + end # def initialize + + def validate_config() + # Add an additional validation and fail this upfront + if @app_id.to_s.empty? && @app_key.to_s.empty? && @managed_identity_id.to_s.empty? + @logger.error('managed_identity_id is not provided and app_id/app_key is empty.') + raise LogStash::ConfigurationError.new('managed_identity_id is not provided and app_id/app_key is empty.') + end + # If proxy AAD is required and the proxy configuration is not provided - fail + if @proxy_aad_only && @is_direct_conn + @logger.error('proxy_aad_only can be used only when proxy is configured.', proxy_aad_only) + raise LogStash::ConfigurationError.new('proxy_aad_only can be used only when proxy is configured.') + end + + if @database =~ FIELD_REF + @logger.error('database config value should not be dynamic.', database) + raise LogStash::ConfigurationError.new('database config value should not be dynamic.') + end + + if @table =~ FIELD_REF + @logger.error('table config value should not be dynamic.', table) + raise LogStash::ConfigurationError.new('table config value should not be dynamic.') + end + + if @json_mapping =~ FIELD_REF + @logger.error('json_mapping config value should not be dynamic.', json_mapping) + raise LogStash::ConfigurationError.new('json_mapping config value should not be dynamic.') + end + + if not(["https", "http"].include? @proxy_protocol) + @logger.error('proxy_protocol has to be http or https.', proxy_protocol) + raise LogStash::ConfigurationError.new('proxy_protocol has to be http or https.') + end + # If all validation pass then configuration is valid + return true + end #validate_config() + + # Getters for all the attributes defined in this class + def ingest_url + @ingest_url + end + def database + @database + end + def table + @table + end + def json_mapping + @json_mapping + end + def is_mapping_ref_provided + @is_mapping_ref_provided + end + # Authentication configuration + def app_id + @app_id + end + def app_key + @app_key + end + def app_tenant + @app_tenant + end + def managed_identity_id + @managed_identity_id + end + + def is_managed_identity + @is_managed_identity + end + + def is_system_assigned_managed_identity + @is_system_assigned_managed_identity + end + + # proxy configuration + def proxy_host + @proxy_host + end + + def proxy_port + @proxy_port + end + + def proxy_protocol + @proxy_protocol + end + + def proxy_aad_only + @proxy_aad_only + end + + def is_direct_conn + @is_direct_conn + end + end # class KustoLogstashConfiguration + end # module Kusto + end # module Outputs +end # module LogStash \ No newline at end of file diff --git a/spec/outputs/kusto/ingestor_spec.rb b/spec/outputs/kusto/ingestor_spec.rb deleted file mode 100755 index cc0263c4..00000000 --- a/spec/outputs/kusto/ingestor_spec.rb +++ /dev/null @@ -1,131 +0,0 @@ -# encoding: utf-8 -require_relative "../../spec_helpers.rb" -require 'logstash/outputs/kusto' -require 'logstash/outputs/kusto/ingestor' - -describe LogStash::Outputs::Kusto::Ingestor do - - let(:ingest_url) { "https://ingest-sdkse2etest.eastus.kusto.windows.net/" } - let(:app_id) { "myid" } - let(:app_key) { LogStash::Util::Password.new("mykey") } - let(:app_tenant) { "mytenant" } - let(:managed_identity) { "managed_identity" } - let(:database) { "mydatabase" } - let(:table) { "mytable" } - let(:proxy_host) { "localhost" } - let(:proxy_port) { 80 } - let(:proxy_protocol) { "http" } - let(:json_mapping) { "mymapping" } - let(:delete_local) { false } - let(:logger) { spy('logger') } - - describe '#initialize' do - - it 'does not throw an error when initializing' do - # note that this will cause an internal error since connection is being tried. - # however we still want to test that all the java stuff is working as expected - expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol, logger) - ingestor.stop - }.not_to raise_error - end - - dynamic_name_array = ['/a%{name}/', '/a %{name}/', '/a- %{name}/', '/a- %{name}'] - - context 'doesnt allow database to have some dynamic part' do - dynamic_name_array.each do |test_database| - it "with database: #{test_database}" do - expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, test_database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol,logger) - ingestor.stop - }.to raise_error(LogStash::ConfigurationError) - end - end - end - - context 'doesnt allow table to have some dynamic part' do - dynamic_name_array.each do |test_table| - it "with database: #{test_table}" do - expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity,database, test_table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol,logger) - ingestor.stop - }.to raise_error(LogStash::ConfigurationError) - end - end - end - - context 'doesnt allow mapping to have some dynamic part' do - dynamic_name_array.each do |json_mapping| - it "with database: #{json_mapping}" do - expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity,database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol,logger) - ingestor.stop - }.to raise_error(LogStash::ConfigurationError) - end - end - end - - context 'proxy protocol has to be http or https' do - it "with proxy protocol: socks" do - expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity,database, table, json_mapping, delete_local, proxy_host, proxy_port,'socks',logger) - ingestor.stop - }.to raise_error(LogStash::ConfigurationError) - end - end - - context 'one of appid or managedid has to be provided' do - it "with empty managed identity and appid" do - expect { - ingestor = described_class.new(ingest_url, "", app_key, app_tenant, "",database, table, json_mapping, delete_local, proxy_host, proxy_port,'socks',logger) - ingestor.stop - }.to raise_error(LogStash::ConfigurationError) - end - end - - end - - # describe 'receiving events' do - - # context 'with non-zero flush interval' do - # let(:temporary_output_file) { Stud::Temporary.pathname } - - # let(:event_count) { 100 } - # let(:flush_interval) { 5 } - - # let(:events) do - # event_count.times.map do |idx| - # LogStash::Event.new('subject' => idx) - # end - # end - - # let(:output) { described_class.new(options.merge( {'path' => temporary_output_file, 'flush_interval' => flush_interval, 'delete_temp_files' => false } )) } - - # before(:each) { output.register } - - # after(:each) do - # output.close - # File.exist?(temporary_output_file) && File.unlink(temporary_output_file) - # File.exist?(temporary_output_file + '.kusto') && File.unlink(temporary_output_file + '.kusto') - # end - - # it 'eventually flushes without receiving additional events' do - # output.multi_receive_encoded(events) - - # # events should not all be flushed just yet... - # expect(File.read(temporary_output_file)).to satisfy("have less than #{event_count} lines") do |contents| - # contents && contents.lines.count < event_count - # end - - # # wait for the flusher to run... - # sleep(flush_interval + 1) - - # # events should all be flushed - # expect(File.read(temporary_output_file)).to satisfy("have exactly #{event_count} lines") do |contents| - # contents && contents.lines.count == event_count - # end - # end - # end - - # end -end diff --git a/spec/outputs/kusto/kustoLogstashConfiguration_spec.rb b/spec/outputs/kusto/kustoLogstashConfiguration_spec.rb new file mode 100755 index 00000000..70a6f0e8 --- /dev/null +++ b/spec/outputs/kusto/kustoLogstashConfiguration_spec.rb @@ -0,0 +1,95 @@ +# encoding: utf-8 +require_relative "../../spec_helpers.rb" +require 'logstash/outputs/kusto' +require 'logstash/outputs/kusto/kustoLogstashConfiguration' + +describe LogStash::Outputs::KustoInternal::KustoLogstashConfiguration do + + let(:ingest_url) { "https://ingest-sdkse2etest.eastus.kusto.windows.net/" } + let(:app_id) { "myid" } + let(:app_key) { LogStash::Util::Password.new("mykey") } + let(:app_tenant) { "mytenant" } + let(:managed_identity) { "managed_identity" } + let(:database) { "mydatabase" } + let(:table) { "mytable" } + let(:proxy_host) { "localhost" } + let(:proxy_port) { 80 } + let(:proxy_protocol) { "http" } + let(:json_mapping) { "mymapping" } + let(:delete_local) { false } + let(:logger) { spy(:logger) } + let(:proxy_aad_only) { false } + + describe '#initialize' do + it 'does not throw an error when initializing' do + # note that this will cause an internal error since connection is being tried. + # however we still want to test that all the java stuff is working as expected + expect { + kustoLogstashOutputConfiguration = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol, proxy_aad_only, logger) + kustoLogstashOutputConfiguration.validate_config() + }.not_to raise_error + end + + dynamic_name_array = ['/a%{name}/', '/a %{name}/', '/a- %{name}/', '/a- %{name}'] + + context 'doesnt allow database to have some dynamic part' do + dynamic_name_array.each do |test_database| + it "with database: #{test_database}" do + expect { + kustoLogstashOutputConfiguration = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, test_database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol, proxy_aad_only,logger) + kustoLogstashOutputConfiguration.validate_config() + }.to raise_error(LogStash::ConfigurationError) + end + end + end + + context 'doesnt allow table to have some dynamic part' do + dynamic_name_array.each do |test_table| + it "with database: #{test_table}" do + expect { + kustoLogstashOutputConfiguration = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity,database, test_table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol, proxy_aad_only,logger) + kustoLogstashOutputConfiguration.validate_config() + }.to raise_error(LogStash::ConfigurationError) + end + end + end + + context 'doesnt allow mapping to have some dynamic part' do + dynamic_name_array.each do |json_mapping| + it "with database: #{json_mapping}" do + expect { + kustoLogstashOutputConfiguration = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity,database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol, proxy_aad_only,logger) + kustoLogstashOutputConfiguration.validate_config() + }.to raise_error(LogStash::ConfigurationError) + end + end + end + + context 'proxy protocol has to be http or https' do + it "with proxy protocol: socks" do + expect { + kustoLogstashOutputConfiguration = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity,database, table, json_mapping, delete_local, proxy_host, proxy_port,'socks', proxy_aad_only,logger) + kustoLogstashOutputConfiguration.validate_config() + }.to raise_error(LogStash::ConfigurationError) + end + end + + context 'one of appid or managedid has to be provided' do + it "with empty managed identity and appid" do + expect { + kustoLogstashOutputConfiguration = described_class.new(ingest_url, "", app_key, app_tenant, "",database, table, json_mapping, delete_local, proxy_host, proxy_port,'socks', proxy_aad_only,logger) + kustoLogstashOutputConfiguration.validate_config() + }.to raise_error(LogStash::ConfigurationError) + end + end + + context 'if proxy_aad is provided' do + it "proxy details should be provided" do + expect { + kustoLogstashOutputConfiguration = described_class.new(ingest_url, "", app_key, app_tenant, "",database, table, json_mapping, delete_local, nil, nil,'https', true,logger) + kustoLogstashOutputConfiguration.validate_config() + }.to raise_error(LogStash::ConfigurationError) + end + end + end +end From ba8a58c2409132e2cf18321b32c4f6b63ff700b7 Mon Sep 17 00:00:00 2001 From: Ramachandran A G Date: Thu, 14 Dec 2023 11:39:32 +0530 Subject: [PATCH 2/7] * Revision with fixes --- lib/logstash/outputs/kusto.rb | 13 +++--- lib/logstash/outputs/kusto/ingestor.rb | 42 ++++++++--------- .../outputs/kusto/kustoAadProvider.rb | 45 ++++++++++--------- .../kusto/kustoLogstashConfiguration.rb | 25 ++++++----- logstash-output-kusto.gemspec | 1 + 5 files changed, 62 insertions(+), 64 deletions(-) diff --git a/lib/logstash/outputs/kusto.rb b/lib/logstash/outputs/kusto.rb index 66962ae9..808c8ef2 100755 --- a/lib/logstash/outputs/kusto.rb +++ b/lib/logstash/outputs/kusto.rb @@ -6,6 +6,7 @@ require 'logstash/outputs/kusto/ingestor' require 'logstash/outputs/kusto/interval' +require 'logstash/outputs/kusto/kustoLogstashConfiguration' ## # This plugin sends messages to Azure Kusto in batches. @@ -152,17 +153,13 @@ def register end @failure_path = File.join(@file_root, @filename_failure) - executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1, - max_threads: upload_concurrent_count, - max_queue: upload_queue_size, - fallback_policy: :caller_runs) - - @ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, managed_identity, database, table, - final_mapping, delete_temp_files, proxy_host, proxy_port, proxy_protocol, proxy_aad_only, @logger, executor) + kustoLogstashConfiguration = LogStash::Outputs::KustoInternal::KustoLogstashConfiguration.new(ingest_url, app_id, app_key, app_tenant, managed_identity, database, table, json_mapping, delete_temp_files, proxy_host, proxy_port, proxy_protocol, proxy_aad_only, @logger) + kustoLogstashConfiguration.validate_config() + executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: upload_concurrent_count, max_queue: upload_queue_size, fallback_policy: :caller_runs) + @ingestor = Ingestor.new(kustoLogstashConfiguration, @logger, executor) # send existing files recover_past_files if recovery - @last_stale_cleanup_cycle = Time.now @flush_interval = @flush_interval.to_i diff --git a/lib/logstash/outputs/kusto/ingestor.rb b/lib/logstash/outputs/kusto/ingestor.rb index 986a383f..a19756f6 100755 --- a/lib/logstash/outputs/kusto/ingestor.rb +++ b/lib/logstash/outputs/kusto/ingestor.rb @@ -1,8 +1,9 @@ # encoding: utf-8 - require 'logstash/outputs/base' require 'logstash/namespace' require 'logstash/errors' +require 'logstash/outputs/kusto/kustoLogstashConfiguration' +require 'logstash/outputs/kusto/kustoAadProvider' class LogStash::Outputs::Kusto < LogStash::Outputs::Base ## @@ -18,27 +19,24 @@ class Ingestor fallback_policy: :caller_runs ) LOW_QUEUE_LENGTH = 3 - FIELD_REF = /%\{[^}]+\}/ - def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,proxy_aad_only, logger, threadpool = DEFAULT_THREADPOOL) - @workers_pool = threadpool + def initialize(kustoLogstashConfiguration, logger, threadpool = DEFAULT_THREADPOOL) @logger = logger - @kustoLogstashConfiguration = LogStash::Outputs::KustoInternal::KustoLogstashConfiguration.new(ingest_url, app_id, app_key, app_tenant, managed_identity_id, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,proxy_aad_only, logger) - @kustoLogstashConfiguration.validate_config() + @workers_pool = threadpool + @kustoLogstashConfiguration = kustoLogstashConfiguration @logger.info('Preparing Kusto resources.') @ingestion_properties = get_ingestion_properties() if @kustoLogstashConfiguration.proxy_aad_only @kustoAadTokenProvider = LogStash::Outputs::KustoInternal::KustoAadTokenProvider.new(@kustoLogstashConfiguration) end - @delete_local = delete_local @logger.debug('Kusto resources are ready.') end def get_kusto_client() - if @kusto_client.nil? + if @kusto_client.nil? || (@kustoLogstashConfiguration.proxy_aad_only && @kustoAadTokenProvider.is_saved_token_need_refresh()) kusto_client = create_kusto_client() end - return kusto_client + return @kusto_client end def create_kusto_client() @@ -48,15 +46,15 @@ def create_kusto_client() kusto_connection_string = if @kustoLogstashConfiguration.is_managed_identity if @kustoLogstashConfiguration.is_system_assigned_managed_identity @logger.info('Using system managed identity.') - kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url) + kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(@kustoLogstashConfiguration.ingest_url) else @logger.info('Using user managed identity.') - kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url, managed_identity_id) + kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(@kustoLogstashConfiguration.ingest_url, @kustoLogstashConfiguration.managed_identity_id) end elsif @kustoLogstashConfiguration.proxy_aad_only - kusto_java.data.auth.ConnectionStringBuilder.createWithAccessToken(ingest_url, ) + kusto_java.data.auth.ConnectionStringBuilder.createWithAccessToken(@kustoLogstashConfiguration.ingest_url,@kustoLogstashConfiguration.get_aad_token_bearer()) else - kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant) + kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(@kustoLogstashConfiguration.ingest_url, @kustoLogstashConfiguration.app_id, @kustoLogstashConfiguration.app_key.value, @kustoLogstashConfiguration.app_tenant) end # @logger.debug(Gem.loaded_specs.to_s) @@ -70,25 +68,21 @@ def create_kusto_client() kusto_connection_string.setConnectorDetails("Logstash",version_for_tracing.to_s,"","",false,"", tuple_utils.Pair.emptyArray()); @kusto_client = begin - if is_direct_conn + if @kustoLogstashConfiguration.is_direct_conn || @kustoLogstashConfiguration.proxy_aad_only kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string) else - kusto_http_client_properties = kusto_java.data.HttpClientProperties.builder().proxy(apache_http.HttpHost.new(proxy_host,proxy_port,proxy_protocol)).build() + kusto_http_client_properties = kusto_java.data.HttpClientProperties.builder().proxy(apache_http.HttpHost.new(@kustoLogstashConfiguration.proxy_host,@kustoLogstashConfiguration.proxy_port,@kustoLogstashConfiguration.proxy_protocol)).build() kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string, kusto_http_client_properties) end end - - - - - end def get_ingestion_properties() + kusto_java = Java::com.microsoft.azure.kusto ingestion_properties = kusto_java.ingest.IngestionProperties.new(@kustoLogstashConfiguration.database, @kustoLogstashConfiguration.table) if @kustoLogstashConfiguration.is_mapping_ref_provided - @logger.debug('Using mapping reference.', json_mapping) - ingestion_properties.setIngestionMapping(json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON) + @logger.debug('Using mapping reference.', @kustoLogstashConfiguration.json_mapping) + ingestion_properties.setIngestionMapping(@kustoLogstashConfiguration.json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON) ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON) else @logger.debug('No mapping reference provided. Columns will be mapped by names in the logstash output') @@ -115,7 +109,7 @@ def upload(path, delete_on_success) @logger.debug("Sending file to kusto: #{path}. size: #{file_size}") if file_size > 0 file_source_info = Java::com.microsoft.azure.kusto.ingest.source.FileSourceInfo.new(path, 0); # 0 - let the sdk figure out the size of the file - @kusto_client.ingestFromFile(file_source_info, @ingestion_properties) + get_kusto_client().ingestFromFile(file_source_info, @ingestion_properties) else @logger.warn("File #{path} is an empty file and is not ingested.") end @@ -140,4 +134,4 @@ def stop @workers_pool.wait_for_termination(nil) # block until its done end end -end +end \ No newline at end of file diff --git a/lib/logstash/outputs/kusto/kustoAadProvider.rb b/lib/logstash/outputs/kusto/kustoAadProvider.rb index 5f11f1bc..668d931d 100644 --- a/lib/logstash/outputs/kusto/kustoAadProvider.rb +++ b/lib/logstash/outputs/kusto/kustoAadProvider.rb @@ -1,10 +1,11 @@ -# encoding: utf - 8 -require "logstash/kusto/kustoLogstashConfiguration" +# encoding:utf-8 require 'rest-client' require 'json' require 'openssl' require 'base64' require 'time' +require 'logstash/outputs/kusto/kustoLogstashConfiguration' + module LogStash module Outputs @@ -19,45 +20,45 @@ def initialize(kustoLogstashConfiguration) @token_request_body = sprintf("client_id=%s&scope=%s&client_secret=%s&grant_type=client_credentials", kustoLogstashConfiguration.app_id, scope, kustoLogstashConfiguration.app_key) @token_request_uri = sprintf("%s/%s/oauth2/v2.0/token", aad_uri, kustoLogstashConfiguration.app_tenant) @token_state = { - : access_token => nil, - : expiry_time => nil, - : token_details_mutex => Mutex.new, - } + :access_token => nil, + :expiry_time => nil, + :token_details_mutex => Mutex.new, + } end # def initialize # Public methods public def get_aad_token_bearer() - @token_state[: token_details_mutex].synchronize - do - if is_saved_token_need_refresh() - refresh_saved_token() - end - return @token_state[: access_token] + @token_state[:token_details_mutex].synchronize do + if is_saved_token_need_refresh() + refresh_saved_token() + end + return @token_state[:access_token] end end # def get_aad_token_bearer - # Private methods - private def is_saved_token_need_refresh() - return @token_state[: access_token].nil ? || @token_state[: expiry_time].nil ? || @token_state[: expiry_time] <= Time.now + return @token_state[:access_token].nil? || @token_state[:expiry_time].nil? || @token_state[:expiry_time] <= Time.now end # def is_saved_token_need_refresh + # Private methods + private + def refresh_saved_token() @logger.info("aad token expired - refreshing token.") token_response = post_token_request() - @token_state[: access_token] = token_response["access_token"] - @token_state[: expiry_time] = get_token_expiry_time(token_response["expires_in"]) + @token_state[:access_token] = token_response["access_token"] + @token_state[:expiry_time] = get_token_expiry_time(token_response["expires_in"]) end # def refresh_saved_token def get_token_expiry_time(expires_in_seconds) - if (expires_in_seconds.nil ? || expires_in_seconds <= 0) + if (expires_in_seconds.nil? || expires_in_seconds <= 0) return Time.now + (60 * 60 * 24) # Refresh anyway in 24 hours else - return Time.now + expires_in_seconds - 1; - # Decrease by 1 second to be on the safe side + return Time.now + expires_in_seconds - 30 * 60; + # Decrease by 30 seconds to be on the safe side end end # def get_token_expiry_time @@ -69,8 +70,8 @@ def post_token_request() begin proxy_aad = sprintf("%s://%s:%s", @kustoLogstashConfiguration.proxy_protocol, @kustoLogstashConfiguration.proxy_host, @kustoLogstashConfiguration.proxy_port) # Post REST request - response = RestClient::Request.execute(method:: post, url: @token_request_uri, payload: @token_request_body, headers: headers, - proxy: proxy_aad) + response = RestClient::Request.execute(method::post, url:@token_request_uri, payload:@token_request_body, headers:headers, + proxy:proxy_aad) if (response.code == 200 || response.code == 201) return JSON.parse(response.body) diff --git a/lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb b/lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb index 4fb8dec4..e7d8ed52 100644 --- a/lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb +++ b/lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb @@ -1,6 +1,5 @@ # encoding: utf-8 # A class just having all the configurations wrapped into a seperate object - module LogStash module Outputs module KustoInternal @@ -46,24 +45,30 @@ def validate_config() end if @database =~ FIELD_REF - @logger.error('database config value should not be dynamic.', database) - raise LogStash::ConfigurationError.new('database config value should not be dynamic.') - end + @logger.error('database config value should not be dynamic.', database) + raise LogStash::ConfigurationError.new('database config value should not be dynamic.') + end if @table =~ FIELD_REF - @logger.error('table config value should not be dynamic.', table) - raise LogStash::ConfigurationError.new('table config value should not be dynamic.') + @logger.error('table config value should not be dynamic.', table) + raise LogStash::ConfigurationError.new('table config value should not be dynamic.') end if @json_mapping =~ FIELD_REF - @logger.error('json_mapping config value should not be dynamic.', json_mapping) - raise LogStash::ConfigurationError.new('json_mapping config value should not be dynamic.') + @logger.error('json_mapping config value should not be dynamic.', json_mapping) + raise LogStash::ConfigurationError.new('json_mapping config value should not be dynamic.') end if not(["https", "http"].include? @proxy_protocol) - @logger.error('proxy_protocol has to be http or https.', proxy_protocol) - raise LogStash::ConfigurationError.new('proxy_protocol has to be http or https.') + @logger.error('proxy_protocol has to be http or https.', proxy_protocol) + raise LogStash::ConfigurationError.new('proxy_protocol has to be http or https.') + end + + if @proxy_aad_only && @is_direct_conn + @logger.error('proxy_aad_only is true, but proxy parameters (Host,Port,Protocol) are missing.',proxy_host,proxy_port,proxy_protocol) + raise LogStash::ConfigurationError.new('proxy_aad_only is true, but proxy parameters (Host,Port,Protocol) are missing.') end + # If all validation pass then configuration is valid return true end #validate_config() diff --git a/logstash-output-kusto.gemspec b/logstash-output-kusto.gemspec index af083c13..c586c9d9 100755 --- a/logstash-output-kusto.gemspec +++ b/logstash-output-kusto.gemspec @@ -20,6 +20,7 @@ Gem::Specification.new do |s| s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" } # Gem dependencies + s.add_runtime_dependency "rest-client", ">= 2.1.0" s.add_runtime_dependency 'logstash-core', '>= 8.3.0' s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" s.add_runtime_dependency 'logstash-codec-json_lines' From 2ca4683f283e006738f93abb7a319e15bf57f312 Mon Sep 17 00:00:00 2001 From: Ramachandran A G Date: Thu, 11 Jan 2024 11:40:59 +0530 Subject: [PATCH 3/7] * Code and dependency fixes --- e2e/e2e.rb | 2 +- lib/logstash/outputs/kusto.rb | 10 +++--- lib/logstash/outputs/kusto/ingestor.rb | 36 +++++++++---------- .../outputs/kusto/kustoAadProvider.rb | 6 ++-- logstash-output-kusto.gemspec | 1 + 5 files changed, 27 insertions(+), 28 deletions(-) diff --git a/e2e/e2e.rb b/e2e/e2e.rb index 618f6739..3a13af49 100755 --- a/e2e/e2e.rb +++ b/e2e/e2e.rb @@ -82,7 +82,7 @@ def run_logstash logstashpath = File.absolute_path("logstash.conf") File.write(@output_file, "") File.write(@input_file, "") - lscommand = "/usr/share/logstash/bin/logstash -f #{logstashpath}" + lscommand = "/softwares/logstash/bin/logstash -f #{logstashpath}" puts "Running logstash from config path #{logstashpath} and final command #{lscommand}" spawn(lscommand) sleep(60) diff --git a/lib/logstash/outputs/kusto.rb b/lib/logstash/outputs/kusto.rb index 808c8ef2..f60dfa34 100755 --- a/lib/logstash/outputs/kusto.rb +++ b/lib/logstash/outputs/kusto.rb @@ -117,7 +117,7 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base config :proxy_port, validate: :number, required: false , default: 80 # Check Proxy URL can be over http or https. Dowe need it this way or ignore this & remove this - config :proxy_protocol, validate: :string, required: false , default: 'http' + config :proxy_protocol, validate: :string, required: false , default: 'https' # Use proxy for AAD only. If true, the plugin will use the proxy only for AAD authentication and will not use it for the actual data transfer. config :proxy_aad_only, validate: :boolean, required: false , default: false @@ -152,12 +152,10 @@ def register File.dirname(path) end @failure_path = File.join(@file_root, @filename_failure) - - kustoLogstashConfiguration = LogStash::Outputs::KustoInternal::KustoLogstashConfiguration.new(ingest_url, app_id, app_key, app_tenant, managed_identity, database, table, json_mapping, delete_temp_files, proxy_host, proxy_port, proxy_protocol, proxy_aad_only, @logger) - kustoLogstashConfiguration.validate_config() + kusto_ls_config = LogStash::Outputs::KustoInternal::KustoLogstashConfiguration.new(ingest_url, app_id, app_key, app_tenant, managed_identity, database, table, json_mapping, delete_temp_files, proxy_host, proxy_port, proxy_protocol, proxy_aad_only, @logger) + kusto_ls_config.validate_config() executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: upload_concurrent_count, max_queue: upload_queue_size, fallback_policy: :caller_runs) - @ingestor = Ingestor.new(kustoLogstashConfiguration, @logger, executor) - + @ingestor = Ingestor.new(kusto_ls_config, @logger, executor) # send existing files recover_past_files if recovery @last_stale_cleanup_cycle = Time.now diff --git a/lib/logstash/outputs/kusto/ingestor.rb b/lib/logstash/outputs/kusto/ingestor.rb index a19756f6..490997fb 100755 --- a/lib/logstash/outputs/kusto/ingestor.rb +++ b/lib/logstash/outputs/kusto/ingestor.rb @@ -20,20 +20,20 @@ class Ingestor ) LOW_QUEUE_LENGTH = 3 - def initialize(kustoLogstashConfiguration, logger, threadpool = DEFAULT_THREADPOOL) + def initialize(kustoLsConfig, logger, threadpool = DEFAULT_THREADPOOL) @logger = logger @workers_pool = threadpool - @kustoLogstashConfiguration = kustoLogstashConfiguration + @kusto_ls_config = kustoLsConfig @logger.info('Preparing Kusto resources.') @ingestion_properties = get_ingestion_properties() - if @kustoLogstashConfiguration.proxy_aad_only - @kustoAadTokenProvider = LogStash::Outputs::KustoInternal::KustoAadTokenProvider.new(@kustoLogstashConfiguration) + if @kusto_ls_config.proxy_aad_only + @kustoAadTokenProvider = LogStash::Outputs::KustoInternal::KustoAadTokenProvider.new(@kusto_ls_config,@logger) end @logger.debug('Kusto resources are ready.') end def get_kusto_client() - if @kusto_client.nil? || (@kustoLogstashConfiguration.proxy_aad_only && @kustoAadTokenProvider.is_saved_token_need_refresh()) + if @kusto_client.nil? || (@kusto_ls_config.proxy_aad_only && @kustoAadTokenProvider.is_saved_token_need_refresh()) kusto_client = create_kusto_client() end return @kusto_client @@ -43,18 +43,18 @@ def create_kusto_client() kusto_java = Java::com.microsoft.azure.kusto apache_http = Java::org.apache.http # Create a connection string - kusto_connection_string = if @kustoLogstashConfiguration.is_managed_identity - if @kustoLogstashConfiguration.is_system_assigned_managed_identity + kusto_connection_string = if @kusto_ls_config.is_managed_identity + if @kusto_ls_config.is_system_assigned_managed_identity @logger.info('Using system managed identity.') - kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(@kustoLogstashConfiguration.ingest_url) + kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(@kusto_ls_config.ingest_url) else @logger.info('Using user managed identity.') - kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(@kustoLogstashConfiguration.ingest_url, @kustoLogstashConfiguration.managed_identity_id) + kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(@kusto_ls_config.ingest_url, @kusto_ls_config.managed_identity_id) end - elsif @kustoLogstashConfiguration.proxy_aad_only - kusto_java.data.auth.ConnectionStringBuilder.createWithAccessToken(@kustoLogstashConfiguration.ingest_url,@kustoLogstashConfiguration.get_aad_token_bearer()) + elsif @kusto_ls_config.proxy_aad_only + kusto_java.data.auth.ConnectionStringBuilder.createWithAccessToken(@kusto_ls_config.ingest_url,@kusto_ls_config.get_aad_token_bearer()) else - kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(@kustoLogstashConfiguration.ingest_url, @kustoLogstashConfiguration.app_id, @kustoLogstashConfiguration.app_key.value, @kustoLogstashConfiguration.app_tenant) + kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(@kusto_ls_config.ingest_url, @kusto_ls_config.app_id, @kusto_ls_config.app_key.value, @kusto_ls_config.app_tenant) end # @logger.debug(Gem.loaded_specs.to_s) @@ -68,10 +68,10 @@ def create_kusto_client() kusto_connection_string.setConnectorDetails("Logstash",version_for_tracing.to_s,"","",false,"", tuple_utils.Pair.emptyArray()); @kusto_client = begin - if @kustoLogstashConfiguration.is_direct_conn || @kustoLogstashConfiguration.proxy_aad_only + if @kusto_ls_config.is_direct_conn || @kusto_ls_config.proxy_aad_only kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string) else - kusto_http_client_properties = kusto_java.data.HttpClientProperties.builder().proxy(apache_http.HttpHost.new(@kustoLogstashConfiguration.proxy_host,@kustoLogstashConfiguration.proxy_port,@kustoLogstashConfiguration.proxy_protocol)).build() + kusto_http_client_properties = kusto_java.data.HttpClientProperties.builder().proxy(apache_http.HttpHost.new(@kusto_ls_config.proxy_host,@kusto_ls_config.proxy_port,@kusto_ls_config.proxy_protocol)).build() kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string, kusto_http_client_properties) end end @@ -79,10 +79,10 @@ def create_kusto_client() def get_ingestion_properties() kusto_java = Java::com.microsoft.azure.kusto - ingestion_properties = kusto_java.ingest.IngestionProperties.new(@kustoLogstashConfiguration.database, @kustoLogstashConfiguration.table) - if @kustoLogstashConfiguration.is_mapping_ref_provided - @logger.debug('Using mapping reference.', @kustoLogstashConfiguration.json_mapping) - ingestion_properties.setIngestionMapping(@kustoLogstashConfiguration.json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON) + ingestion_properties = kusto_java.ingest.IngestionProperties.new(@kusto_ls_config.database, @kusto_ls_config.table) + if @kusto_ls_config.is_mapping_ref_provided + @logger.debug('Using mapping reference.', @kusto_ls_config.json_mapping) + ingestion_properties.setIngestionMapping(@kusto_ls_config.json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON) ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON) else @logger.debug('No mapping reference provided. Columns will be mapped by names in the logstash output') diff --git a/lib/logstash/outputs/kusto/kustoAadProvider.rb b/lib/logstash/outputs/kusto/kustoAadProvider.rb index 668d931d..fd2f5ec0 100644 --- a/lib/logstash/outputs/kusto/kustoAadProvider.rb +++ b/lib/logstash/outputs/kusto/kustoAadProvider.rb @@ -11,14 +11,14 @@ module LogStash module Outputs module KustoInternal class KustoAadTokenProvider - def initialize(kustoLogstashConfiguration) + def initialize(kustoLogstashConfiguration, logger) @kustoLogstashConfiguration = kustoLogstashConfiguration # Perform the auth initialization scope = CGI.escape(sprintf("%s/.default",kustoLogstashConfiguration.ingest_url)) @logger = logger @aad_uri = "https://login.microsoftonline.com" - @token_request_body = sprintf("client_id=%s&scope=%s&client_secret=%s&grant_type=client_credentials", kustoLogstashConfiguration.app_id, scope, kustoLogstashConfiguration.app_key) - @token_request_uri = sprintf("%s/%s/oauth2/v2.0/token", aad_uri, kustoLogstashConfiguration.app_tenant) + @token_request_body = sprintf("client_id=%s&scope=%s&client_secret=%s&grant_type=client_credentials", @kustoLogstashConfiguration.app_id, scope, @kustoLogstashConfiguration.app_key) + @token_request_uri = sprintf("%s/%s/oauth2/v2.0/token", @aad_uri, @kustoLogstashConfiguration.app_tenant) @token_state = { :access_token => nil, :expiry_time => nil, diff --git a/logstash-output-kusto.gemspec b/logstash-output-kusto.gemspec index c586c9d9..b037d79d 100755 --- a/logstash-output-kusto.gemspec +++ b/logstash-output-kusto.gemspec @@ -21,6 +21,7 @@ Gem::Specification.new do |s| # Gem dependencies s.add_runtime_dependency "rest-client", ">= 2.1.0" + s.add_runtime_dependency "mime-types", ">= 3.3.1" s.add_runtime_dependency 'logstash-core', '>= 8.3.0' s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" s.add_runtime_dependency 'logstash-codec-json_lines' From 5eae5c2d368c5219f7497708039f9ebd1eed901c Mon Sep 17 00:00:00 2001 From: Ramachandran A G Date: Fri, 12 Jan 2024 17:38:11 +0530 Subject: [PATCH 4/7] * Additional fixes --- docker-e2e/Logstash-Docker | 8 ++++ docker-e2e/docker-compose-all.yml | 30 +++++++++++++ docker-e2e/logstash.yml | 6 +++ e2e/e2e.rb | 2 +- lib/logstash/outputs/kusto/ingestor.rb | 2 +- .../outputs/kusto/kustoAadProvider.rb | 45 +++++++++++-------- version | 2 +- 7 files changed, 73 insertions(+), 22 deletions(-) create mode 100644 docker-e2e/Logstash-Docker create mode 100644 docker-e2e/docker-compose-all.yml create mode 100644 docker-e2e/logstash.yml diff --git a/docker-e2e/Logstash-Docker b/docker-e2e/Logstash-Docker new file mode 100644 index 00000000..32b4516b --- /dev/null +++ b/docker-e2e/Logstash-Docker @@ -0,0 +1,8 @@ +FROM docker.elastic.co/logstash/logstash-oss:8.10.0 +COPY logstash-output-kusto-2.0.5-java.gem /tmp/logstash-output-kusto-2.0.5-java.gem +RUN rm -f /usr/share/logstash/pipeline/logstash.conf && \ + bin/logstash-plugin install /tmp/logstash-output-kusto-2.0.5-java.gem +COPY logstash-output-kusto-apache.conf /usr/share/logstash/pipeline/logstash.conf +COPY logstash.yml /usr/share/logstash/config/logstash.yml +#COPY access-1.log /tmp/access_log/access-1.log +COPY email-agents.txt /tmp/access_log/access-1.log \ No newline at end of file diff --git a/docker-e2e/docker-compose-all.yml b/docker-e2e/docker-compose-all.yml new file mode 100644 index 00000000..5e5ea323 --- /dev/null +++ b/docker-e2e/docker-compose-all.yml @@ -0,0 +1,30 @@ +version: '3.8' +services: + logstash: + build: # "context" and "dockerfile" fields have to be under "build" + context: . + dockerfile: Logstash-Docker + hostname: logstash + environment: + - "LS_JAVA_OPTS=-Xms1024m -Xmx8192m" + ports: + - "9600:9600" + - "5044:5044" + deploy: + restart_policy: + condition: on-failure + squid-proxy: + hostname: squid-proxy + image: ubuntu/squid:latest + ports: + - "3128:3128" + environment: + - TZ=UTC + volumes: + - ./squid.conf:/etc/squid/squid.conf + # configs: + # - source: squid + # target: /etc/squid/squid.conf + deploy: + restart_policy: + condition: on-failure \ No newline at end of file diff --git a/docker-e2e/logstash.yml b/docker-e2e/logstash.yml new file mode 100644 index 00000000..7f1d1ae6 --- /dev/null +++ b/docker-e2e/logstash.yml @@ -0,0 +1,6 @@ +--- +## Default Logstash configuration from Logstash base image. +## https://github.com/elastic/logstash/blob/main/docker/data/logstash/config/logstash-full.yml +# +http.host: 0.0.0.0 +node.name: logstash \ No newline at end of file diff --git a/e2e/e2e.rb b/e2e/e2e.rb index 3a13af49..618f6739 100755 --- a/e2e/e2e.rb +++ b/e2e/e2e.rb @@ -82,7 +82,7 @@ def run_logstash logstashpath = File.absolute_path("logstash.conf") File.write(@output_file, "") File.write(@input_file, "") - lscommand = "/softwares/logstash/bin/logstash -f #{logstashpath}" + lscommand = "/usr/share/logstash/bin/logstash -f #{logstashpath}" puts "Running logstash from config path #{logstashpath} and final command #{lscommand}" spawn(lscommand) sleep(60) diff --git a/lib/logstash/outputs/kusto/ingestor.rb b/lib/logstash/outputs/kusto/ingestor.rb index 490997fb..81f0f9ff 100755 --- a/lib/logstash/outputs/kusto/ingestor.rb +++ b/lib/logstash/outputs/kusto/ingestor.rb @@ -52,7 +52,7 @@ def create_kusto_client() kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(@kusto_ls_config.ingest_url, @kusto_ls_config.managed_identity_id) end elsif @kusto_ls_config.proxy_aad_only - kusto_java.data.auth.ConnectionStringBuilder.createWithAccessToken(@kusto_ls_config.ingest_url,@kusto_ls_config.get_aad_token_bearer()) + kusto_java.data.auth.ConnectionStringBuilder.createWithAccessToken(@kusto_ls_config.ingest_url,@kustoAadTokenProvider.get_aad_token_bearer()) else kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(@kusto_ls_config.ingest_url, @kusto_ls_config.app_id, @kusto_ls_config.app_key.value, @kusto_ls_config.app_tenant) end diff --git a/lib/logstash/outputs/kusto/kustoAadProvider.rb b/lib/logstash/outputs/kusto/kustoAadProvider.rb index fd2f5ec0..66a3148e 100644 --- a/lib/logstash/outputs/kusto/kustoAadProvider.rb +++ b/lib/logstash/outputs/kusto/kustoAadProvider.rb @@ -14,10 +14,10 @@ class KustoAadTokenProvider def initialize(kustoLogstashConfiguration, logger) @kustoLogstashConfiguration = kustoLogstashConfiguration # Perform the auth initialization - scope = CGI.escape(sprintf("%s/.default",kustoLogstashConfiguration.ingest_url)) + scope = CGI.escape(sprintf("%s/.default",@kustoLogstashConfiguration.ingest_url)) @logger = logger @aad_uri = "https://login.microsoftonline.com" - @token_request_body = sprintf("client_id=%s&scope=%s&client_secret=%s&grant_type=client_credentials", @kustoLogstashConfiguration.app_id, scope, @kustoLogstashConfiguration.app_key) + @token_request_body = sprintf("client_id=%s&scope=%s&client_secret=%s&grant_type=client_credentials", @kustoLogstashConfiguration.app_id, scope, @kustoLogstashConfiguration.app_key.value) @token_request_uri = sprintf("%s/%s/oauth2/v2.0/token", @aad_uri, @kustoLogstashConfiguration.app_tenant) @token_state = { :access_token => nil, @@ -65,32 +65,39 @@ def get_token_expiry_time(expires_in_seconds) # Post the given json to Azure Loganalytics def post_token_request() # Create REST request header - headers = get_header() while true begin proxy_aad = sprintf("%s://%s:%s", @kustoLogstashConfiguration.proxy_protocol, @kustoLogstashConfiguration.proxy_host, @kustoLogstashConfiguration.proxy_port) + logdetails = sprintf("ProxyAAD=%s, URL=%s", @proxy_aad, @token_request_uri) + @logger.info("Refreshing token details : #{logdetails}") # Post REST request - response = RestClient::Request.execute(method::post, url:@token_request_uri, payload:@token_request_body, headers:headers, - proxy:proxy_aad) - - if (response.code == 200 || response.code == 201) - return JSON.parse(response.body) + response = RestClient::Request.new({ + method: :post, + url: @token_request_uri, + payload: @token_request_body, + headers: {content_type: 'application/x-www-form-urlencoded'}, + proxy: proxy_aad + }).execute do |response, request, result| + case response.code + when 400 + @logger.trace("Bad request while requesting token : #{@token_request_body}") + when 200 , 201 + return JSON.parse(response.body) + else + @logger.error("Unexpected error refreshing token details : #{logdetails}") + @logger.trace("Unexpected error payload : #{@token_request_body}") + fail "Invalid response #{response.to_str} received." + end end - rescue RestClient::ExceptionWithResponse => ewr + rescue RestClient::ExceptionWithResponse => ewr @logger.error("Exception while authenticating with AAD API ['#{ewr.response}']") - rescue Exception => ex - @logger.trace("Exception while authenticating with AAD API ['#{ex}']") + rescue Exception => ex + @logger.trace("Exception while authenticating with AAD API ['#{ex}']") end - @logger.error("Error while authenticating with AAD ('#{@aad_uri}'), retrying in 10 seconds.") - sleep 10 + @logger.error("Error while authenticating with AAD ('#{@aad_uri}'), retrying in 10 seconds.") + sleep 10 end end # def post_token_request - # Create a header - def get_header() - return { - 'Content-Type' => 'application/x-www-form-urlencoded', - } - end # def get_header end # class KustoAadTokenProvider end # module Kusto end # module Outputs diff --git a/version b/version index 26e33797..b9d2bdfd 100644 --- a/version +++ b/version @@ -1 +1 @@ -2.0.4 \ No newline at end of file +2.0.5 \ No newline at end of file From 84f7194092dedfd3eeee6a6b5fa65f72a40edcc7 Mon Sep 17 00:00:00 2001 From: Ramachandran A G Date: Fri, 12 Jan 2024 18:18:37 +0530 Subject: [PATCH 5/7] * Additional fixes --- lib/logstash/outputs/kusto/ingestor.rb | 2 +- lib/logstash/outputs/kusto/kustoAadProvider.rb | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/logstash/outputs/kusto/ingestor.rb b/lib/logstash/outputs/kusto/ingestor.rb index 81f0f9ff..7999b701 100755 --- a/lib/logstash/outputs/kusto/ingestor.rb +++ b/lib/logstash/outputs/kusto/ingestor.rb @@ -52,7 +52,7 @@ def create_kusto_client() kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(@kusto_ls_config.ingest_url, @kusto_ls_config.managed_identity_id) end elsif @kusto_ls_config.proxy_aad_only - kusto_java.data.auth.ConnectionStringBuilder.createWithAccessToken(@kusto_ls_config.ingest_url,@kustoAadTokenProvider.get_aad_token_bearer()) + kusto_java.data.auth.ConnectionStringBuilder.createWithAadAccessTokenAuthentication(@kusto_ls_config.ingest_url,@kustoAadTokenProvider.get_aad_token_bearer()) else kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(@kusto_ls_config.ingest_url, @kusto_ls_config.app_id, @kusto_ls_config.app_key.value, @kusto_ls_config.app_tenant) end diff --git a/lib/logstash/outputs/kusto/kustoAadProvider.rb b/lib/logstash/outputs/kusto/kustoAadProvider.rb index 66a3148e..7bf28810 100644 --- a/lib/logstash/outputs/kusto/kustoAadProvider.rb +++ b/lib/logstash/outputs/kusto/kustoAadProvider.rb @@ -51,6 +51,7 @@ def refresh_saved_token() token_response = post_token_request() @token_state[:access_token] = token_response["access_token"] @token_state[:expiry_time] = get_token_expiry_time(token_response["expires_in"]) + @logger.info("Token refreshed will expire at : #{@token_state[:expiry_time]}") end # def refresh_saved_token def get_token_expiry_time(expires_in_seconds) @@ -82,7 +83,7 @@ def post_token_request() when 400 @logger.trace("Bad request while requesting token : #{@token_request_body}") when 200 , 201 - return JSON.parse(response.body) + return JSON.parse(response.to_str) else @logger.error("Unexpected error refreshing token details : #{logdetails}") @logger.trace("Unexpected error payload : #{@token_request_body}") From 8ec8920489ab96604aced1e280c6e28396e09421 Mon Sep 17 00:00:00 2001 From: Ramachandran A G Date: Mon, 15 Jan 2024 13:09:46 +0530 Subject: [PATCH 6/7] * Updatecomment in code --- lib/logstash/outputs/kusto/kustoAadProvider.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logstash/outputs/kusto/kustoAadProvider.rb b/lib/logstash/outputs/kusto/kustoAadProvider.rb index 7bf28810..55cb3871 100644 --- a/lib/logstash/outputs/kusto/kustoAadProvider.rb +++ b/lib/logstash/outputs/kusto/kustoAadProvider.rb @@ -59,7 +59,7 @@ def get_token_expiry_time(expires_in_seconds) return Time.now + (60 * 60 * 24) # Refresh anyway in 24 hours else return Time.now + expires_in_seconds - 30 * 60; - # Decrease by 30 seconds to be on the safe side + # Decrease by 30 min to be on the safe side end end # def get_token_expiry_time From 59b387b96c0072adf19108214bffaf09c99f02ef Mon Sep 17 00:00:00 2001 From: Ramachandran A G Date: Mon, 15 Jan 2024 13:10:56 +0530 Subject: [PATCH 7/7] * Update docker file --- .gitignore | 3 +++ docker-e2e/Logstash-Docker | 3 +-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index fc91a8ed..e6f0cf3d 100644 --- a/.gitignore +++ b/.gitignore @@ -61,3 +61,6 @@ rspec.xml e2e/output_file.txt logs.txt local-run.sh +docker-e2e/email-agents.txt +docker-e2e/access-1.log +docker-e2e/2023-12-15-12-fw-d-hub01.log diff --git a/docker-e2e/Logstash-Docker b/docker-e2e/Logstash-Docker index 32b4516b..853bf5a7 100644 --- a/docker-e2e/Logstash-Docker +++ b/docker-e2e/Logstash-Docker @@ -4,5 +4,4 @@ RUN rm -f /usr/share/logstash/pipeline/logstash.conf && \ bin/logstash-plugin install /tmp/logstash-output-kusto-2.0.5-java.gem COPY logstash-output-kusto-apache.conf /usr/share/logstash/pipeline/logstash.conf COPY logstash.yml /usr/share/logstash/config/logstash.yml -#COPY access-1.log /tmp/access_log/access-1.log -COPY email-agents.txt /tmp/access_log/access-1.log \ No newline at end of file +COPY --chown=logstash:logstash access-1.log /tmp/access_log/ \ No newline at end of file