Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,7 @@ gradle/wrapper/gradle-wrapper.properties
rspec.xml
e2e/output_file.txt
logs.txt
local-run.sh
docker-e2e/email-agents.txt
docker-e2e/access-1.log
docker-e2e/2023-12-15-12-fw-d-hub01.log
7 changes: 7 additions & 0 deletions docker-e2e/Logstash-Docker
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM docker.elastic.co/logstash/logstash-oss:8.10.0
COPY logstash-output-kusto-2.0.5-java.gem /tmp/logstash-output-kusto-2.0.5-java.gem
RUN rm -f /usr/share/logstash/pipeline/logstash.conf && \
bin/logstash-plugin install /tmp/logstash-output-kusto-2.0.5-java.gem
COPY logstash-output-kusto-apache.conf /usr/share/logstash/pipeline/logstash.conf
COPY logstash.yml /usr/share/logstash/config/logstash.yml
COPY --chown=logstash:logstash access-1.log /tmp/access_log/
30 changes: 30 additions & 0 deletions docker-e2e/docker-compose-all.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
version: '3.8'
services:
logstash:
build: # "context" and "dockerfile" fields have to be under "build"
context: .
dockerfile: Logstash-Docker
hostname: logstash
environment:
- "LS_JAVA_OPTS=-Xms1024m -Xmx8192m"
ports:
- "9600:9600"
- "5044:5044"
deploy:
restart_policy:
condition: on-failure
squid-proxy:
hostname: squid-proxy
image: ubuntu/squid:latest
ports:
- "3128:3128"
environment:
- TZ=UTC
volumes:
- ./squid.conf:/etc/squid/squid.conf
# configs:
# - source: squid
# target: /etc/squid/squid.conf
deploy:
restart_policy:
condition: on-failure
6 changes: 6 additions & 0 deletions docker-e2e/logstash.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
## Default Logstash configuration from Logstash base image.
## https://github.com/elastic/logstash/blob/main/docker/data/logstash/config/logstash-full.yml
#
http.host: 0.0.0.0
node.name: logstash
19 changes: 9 additions & 10 deletions lib/logstash/outputs/kusto.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

require 'logstash/outputs/kusto/ingestor'
require 'logstash/outputs/kusto/interval'
require 'logstash/outputs/kusto/kustoLogstashConfiguration'

##
# This plugin sends messages to Azure Kusto in batches.
Expand Down Expand Up @@ -116,7 +117,10 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
config :proxy_port, validate: :number, required: false , default: 80

# Check Proxy URL can be over http or https. Dowe need it this way or ignore this & remove this
config :proxy_protocol, validate: :string, required: false , default: 'http'
config :proxy_protocol, validate: :string, required: false , default: 'https'

# Use proxy for AAD only. If true, the plugin will use the proxy only for AAD authentication and will not use it for the actual data transfer.
config :proxy_aad_only, validate: :boolean, required: false , default: false

default :codec, 'json_lines'

Expand Down Expand Up @@ -148,17 +152,12 @@ def register
File.dirname(path)
end
@failure_path = File.join(@file_root, @filename_failure)

executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
max_threads: upload_concurrent_count,
max_queue: upload_queue_size,
fallback_policy: :caller_runs)

@ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, managed_identity, database, table, final_mapping, delete_temp_files, proxy_host, proxy_port,proxy_protocol, @logger, executor)

kusto_ls_config = LogStash::Outputs::KustoInternal::KustoLogstashConfiguration.new(ingest_url, app_id, app_key, app_tenant, managed_identity, database, table, json_mapping, delete_temp_files, proxy_host, proxy_port, proxy_protocol, proxy_aad_only, @logger)
kusto_ls_config.validate_config()
executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: upload_concurrent_count, max_queue: upload_queue_size, fallback_policy: :caller_runs)
@ingestor = Ingestor.new(kusto_ls_config, @logger, executor)
# send existing files
recover_past_files if recovery

@last_stale_cleanup_cycle = Time.now

@flush_interval = @flush_interval.to_i
Expand Down
111 changes: 39 additions & 72 deletions lib/logstash/outputs/kusto/ingestor.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# encoding: utf-8

require 'logstash/outputs/base'
require 'logstash/namespace'
require 'logstash/errors'
require 'logstash/outputs/kusto/kustoLogstashConfiguration'
require 'logstash/outputs/kusto/kustoAadProvider'

class LogStash::Outputs::Kusto < LogStash::Outputs::Base
##
Expand All @@ -18,34 +19,42 @@ class Ingestor
fallback_policy: :caller_runs
)
LOW_QUEUE_LENGTH = 3
FIELD_REF = /%\{[^}]+\}/

def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,logger, threadpool = DEFAULT_THREADPOOL)
@workers_pool = threadpool
def initialize(kustoLsConfig, logger, threadpool = DEFAULT_THREADPOOL)
@logger = logger
validate_config(database, table, json_mapping,proxy_protocol,app_id, app_key, managed_identity_id)
@workers_pool = threadpool
@kusto_ls_config = kustoLsConfig
@logger.info('Preparing Kusto resources.')
@ingestion_properties = get_ingestion_properties()
if @kusto_ls_config.proxy_aad_only
@kustoAadTokenProvider = LogStash::Outputs::KustoInternal::KustoAadTokenProvider.new(@kusto_ls_config,@logger)
end
@logger.debug('Kusto resources are ready.')
end

def get_kusto_client()
if @kusto_client.nil? || (@kusto_ls_config.proxy_aad_only && @kustoAadTokenProvider.is_saved_token_need_refresh())
kusto_client = create_kusto_client()
end
return @kusto_client
end

def create_kusto_client()
kusto_java = Java::com.microsoft.azure.kusto
apache_http = Java::org.apache.http
# kusto_connection_string = kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant)
# If there is managed identity, use it. This means the AppId and AppKey are empty/nil
is_managed_identity = (app_id.nil? && app_key.nil?)
# If it is system managed identity, propagate the system identity
is_system_assigned_managed_identity = is_managed_identity && 0 == "system".casecmp(managed_identity_id)
# Is it direct connection
is_direct_conn = (proxy_host.nil? || proxy_host.empty?)
# Create a connection string
kusto_connection_string = if is_managed_identity
if is_system_assigned_managed_identity
kusto_connection_string = if @kusto_ls_config.is_managed_identity
if @kusto_ls_config.is_system_assigned_managed_identity
@logger.info('Using system managed identity.')
kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url)
kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(@kusto_ls_config.ingest_url)
else
@logger.info('Using user managed identity.')
kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url, managed_identity_id)
kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(@kusto_ls_config.ingest_url, @kusto_ls_config.managed_identity_id)
end
elsif @kusto_ls_config.proxy_aad_only
kusto_java.data.auth.ConnectionStringBuilder.createWithAadAccessTokenAuthentication(@kusto_ls_config.ingest_url,@kustoAadTokenProvider.get_aad_token_bearer())
else
kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant)
kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(@kusto_ls_config.ingest_url, @kusto_ls_config.app_id, @kusto_ls_config.app_key.value, @kusto_ls_config.app_tenant)
end
#
@logger.debug(Gem.loaded_specs.to_s)
Expand All @@ -59,53 +68,26 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, dat
kusto_connection_string.setConnectorDetails("Logstash",version_for_tracing.to_s,"","",false,"", tuple_utils.Pair.emptyArray());

@kusto_client = begin
if is_direct_conn
if @kusto_ls_config.is_direct_conn || @kusto_ls_config.proxy_aad_only
kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string)
else
kusto_http_client_properties = kusto_java.data.HttpClientProperties.builder().proxy(apache_http.HttpHost.new(proxy_host,proxy_port,proxy_protocol)).build()
kusto_http_client_properties = kusto_java.data.HttpClientProperties.builder().proxy(apache_http.HttpHost.new(@kusto_ls_config.proxy_host,@kusto_ls_config.proxy_port,@kusto_ls_config.proxy_protocol)).build()
kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string, kusto_http_client_properties)
end
end
end

@ingestion_properties = kusto_java.ingest.IngestionProperties.new(database, table)
is_mapping_ref_provided = !(json_mapping.nil? || json_mapping.empty?)
if is_mapping_ref_provided
@logger.debug('Using mapping reference.', json_mapping)
@ingestion_properties.setIngestionMapping(json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON)
@ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON)
def get_ingestion_properties()
kusto_java = Java::com.microsoft.azure.kusto
ingestion_properties = kusto_java.ingest.IngestionProperties.new(@kusto_ls_config.database, @kusto_ls_config.table)
if @kusto_ls_config.is_mapping_ref_provided
@logger.debug('Using mapping reference.', @kusto_ls_config.json_mapping)
ingestion_properties.setIngestionMapping(@kusto_ls_config.json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON)
ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON)
else
@logger.debug('No mapping reference provided. Columns will be mapped by names in the logstash output')
end
@delete_local = delete_local
@logger.debug('Kusto resources are ready.')
end

def validate_config(database, table, json_mapping, proxy_protocol, app_id, app_key, managed_identity_id)
# Add an additional validation and fail this upfront
if app_id.nil? && app_key.nil? && managed_identity_id.nil?
@logger.error('managed_identity_id is not provided and app_id/app_key is empty.')
raise LogStash::ConfigurationError.new('managed_identity_id is not provided and app_id/app_key is empty.')
end
if database =~ FIELD_REF
@logger.error('database config value should not be dynamic.', database)
raise LogStash::ConfigurationError.new('database config value should not be dynamic.')
end

if table =~ FIELD_REF
@logger.error('table config value should not be dynamic.', table)
raise LogStash::ConfigurationError.new('table config value should not be dynamic.')
end

if json_mapping =~ FIELD_REF
@logger.error('json_mapping config value should not be dynamic.', json_mapping)
raise LogStash::ConfigurationError.new('json_mapping config value should not be dynamic.')
end

if not(["https", "http"].include? proxy_protocol)
@logger.error('proxy_protocol has to be http or https.', proxy_protocol)
raise LogStash::ConfigurationError.new('proxy_protocol has to be http or https.')
end

return ingestion_properties
end

def upload_async(path, delete_on_success)
Expand All @@ -125,24 +107,9 @@ def upload_async(path, delete_on_success)
def upload(path, delete_on_success)
file_size = File.size(path)
@logger.debug("Sending file to kusto: #{path}. size: #{file_size}")

# TODO: dynamic routing
# file_metadata = path.partition('.kusto.').last
# file_metadata_parts = file_metadata.split('.')

# if file_metadata_parts.length == 3
# # this is the number we expect - database, table, json_mapping
# database = file_metadata_parts[0]
# table = file_metadata_parts[1]
# json_mapping = file_metadata_parts[2]

# local_ingestion_properties = Java::KustoIngestionProperties.new(database, table)
# local_ingestion_properties.addJsonMappingName(json_mapping)
# end

if file_size > 0
file_source_info = Java::com.microsoft.azure.kusto.ingest.source.FileSourceInfo.new(path, 0); # 0 - let the sdk figure out the size of the file
@kusto_client.ingestFromFile(file_source_info, @ingestion_properties)
get_kusto_client().ingestFromFile(file_source_info, @ingestion_properties)
else
@logger.warn("File #{path} is an empty file and is not ingested.")
end
Expand All @@ -167,4 +134,4 @@ def stop
@workers_pool.wait_for_termination(nil) # block until its done
end
end
end
end
106 changes: 106 additions & 0 deletions lib/logstash/outputs/kusto/kustoAadProvider.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# encoding:utf-8
require 'rest-client'
require 'json'
require 'openssl'
require 'base64'
require 'time'
require 'logstash/outputs/kusto/kustoLogstashConfiguration'


module LogStash
module Outputs
module KustoInternal
class KustoAadTokenProvider
def initialize(kustoLogstashConfiguration, logger)
@kustoLogstashConfiguration = kustoLogstashConfiguration
# Perform the auth initialization
scope = CGI.escape(sprintf("%s/.default",@kustoLogstashConfiguration.ingest_url))
@logger = logger
@aad_uri = "https://login.microsoftonline.com"
@token_request_body = sprintf("client_id=%s&scope=%s&client_secret=%s&grant_type=client_credentials", @kustoLogstashConfiguration.app_id, scope, @kustoLogstashConfiguration.app_key.value)
@token_request_uri = sprintf("%s/%s/oauth2/v2.0/token", @aad_uri, @kustoLogstashConfiguration.app_tenant)
@token_state = {
:access_token => nil,
:expiry_time => nil,
:token_details_mutex => Mutex.new,
}
end # def initialize

# Public methods
public

def get_aad_token_bearer()
@token_state[:token_details_mutex].synchronize do
if is_saved_token_need_refresh()
refresh_saved_token()
end
return @token_state[:access_token]
end
end # def get_aad_token_bearer


def is_saved_token_need_refresh()
return @token_state[:access_token].nil? || @token_state[:expiry_time].nil? || @token_state[:expiry_time] <= Time.now
end # def is_saved_token_need_refresh

# Private methods
private

def refresh_saved_token()
@logger.info("aad token expired - refreshing token.")
token_response = post_token_request()
@token_state[:access_token] = token_response["access_token"]
@token_state[:expiry_time] = get_token_expiry_time(token_response["expires_in"])
@logger.info("Token refreshed will expire at : #{@token_state[:expiry_time]}")
end # def refresh_saved_token

def get_token_expiry_time(expires_in_seconds)
if (expires_in_seconds.nil? || expires_in_seconds <= 0)
return Time.now + (60 * 60 * 24) # Refresh anyway in 24 hours
else
return Time.now + expires_in_seconds - 30 * 60;
# Decrease by 30 min to be on the safe side
end
end # def get_token_expiry_time

# Post the given json to Azure Loganalytics
def post_token_request()
# Create REST request header
while true
begin
proxy_aad = sprintf("%s://%s:%s", @kustoLogstashConfiguration.proxy_protocol, @kustoLogstashConfiguration.proxy_host, @kustoLogstashConfiguration.proxy_port)
logdetails = sprintf("ProxyAAD=%s, URL=%s", @proxy_aad, @token_request_uri)
@logger.info("Refreshing token details : #{logdetails}")
# Post REST request
response = RestClient::Request.new({
method: :post,
url: @token_request_uri,
payload: @token_request_body,
headers: {content_type: 'application/x-www-form-urlencoded'},
proxy: proxy_aad
}).execute do |response, request, result|
case response.code
when 400
@logger.trace("Bad request while requesting token : #{@token_request_body}")
when 200 , 201
return JSON.parse(response.to_str)
else
@logger.error("Unexpected error refreshing token details : #{logdetails}")
@logger.trace("Unexpected error payload : #{@token_request_body}")
fail "Invalid response #{response.to_str} received."
end
end
rescue RestClient::ExceptionWithResponse => ewr
@logger.error("Exception while authenticating with AAD API ['#{ewr.response}']")
rescue Exception => ex
@logger.trace("Exception while authenticating with AAD API ['#{ex}']")
end
@logger.error("Error while authenticating with AAD ('#{@aad_uri}'), retrying in 10 seconds.")
sleep 10
end
end # def post_token_request
end # class KustoAadTokenProvider
end # module Kusto
end # module Outputs
end # module LogStash

Loading