Skip to content

Commit 4312df7

Browse files
authored
Index Lifecycle Management Support (#805)
This commit adds Index Lifecycle Management (ILM) support to the Elasticsearch output. It will write Index Lifecycle Management settings to managed templates, create a default ILM policy (where indices will automatically rollover if they either reach `50GB` in size or a age of `30 days`) if required, or use policies predefined in Elasticsearch if preferred. Rollover aliases are automatically created, and data will be ingested to those aliases.
1 parent 303a3f0 commit 4312df7

File tree

18 files changed

+1009
-13
lines changed

18 files changed

+1009
-13
lines changed

ci/run.sh

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,12 @@ else
131131
extra_tag_args="--tag secure_integration"
132132
fi
133133

134+
if [[ "$DISTRIBUTION" == "oss" ]]; then
135+
extra_tag_args="$extra_tag_args --tag distribution:oss --tag ~distribution:xpack"
136+
elif [[ "$DISTRIBUTION" == "default" ]]; then
137+
extra_tag_args="$extra_tag_args --tag ~distribution:oss --tag distribution:xpack"
138+
fi
139+
134140
case "$ES_VERSION" in
135141
LATEST-SNAPSHOT-*)
136142
split_latest=${ES_VERSION##*-}

lib/logstash/outputs/elasticsearch.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,17 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
9090
require "logstash/outputs/elasticsearch/http_client_builder"
9191
require "logstash/outputs/elasticsearch/common_configs"
9292
require "logstash/outputs/elasticsearch/common"
93+
require "logstash/outputs/elasticsearch/ilm"
9394

9495
# Protocol agnostic (i.e. non-http, non-java specific) configs go here
9596
include(LogStash::Outputs::ElasticSearch::CommonConfigs)
9697

9798
# Protocol agnostic methods
9899
include(LogStash::Outputs::ElasticSearch::Common)
99100

101+
# Methods for ILM support
102+
include(LogStash::Outputs::ElasticSearch::Ilm)
103+
100104
config_name "elasticsearch"
101105

102106
# The Elasticsearch action to perform. Valid actions are:

lib/logstash/outputs/elasticsearch/common.rb

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ def register
2323

2424
setup_hosts # properly sets @hosts
2525
build_client
26+
setup_after_successful_connection
2627
check_action_validity
2728
@bulk_request_metrics = metric.namespace(:bulk_requests)
2829
@document_level_metrics = metric.namespace(:documents)
29-
install_template_after_successful_connection
3030
@logger.info("New Elasticsearch output", :class => self.class.name, :hosts => @hosts.map(&:sanitized).map(&:to_s))
3131
end
3232

@@ -38,15 +38,19 @@ def multi_receive(events)
3838
retrying_submit(events.map {|e| event_action_tuple(e)})
3939
end
4040

41-
def install_template_after_successful_connection
41+
def setup_after_successful_connection
4242
@template_installer ||= Thread.new do
4343
sleep_interval = @retry_initial_interval
4444
until successful_connection? || @stopping.true?
4545
@logger.debug("Waiting for connectivity to Elasticsearch cluster. Retrying in #{sleep_interval}s")
4646
Stud.stoppable_sleep(sleep_interval) { @stopping.true? }
4747
sleep_interval = next_sleep_interval(sleep_interval)
4848
end
49-
install_template if successful_connection?
49+
if successful_connection?
50+
verify_ilm_readiness if ilm_enabled?
51+
install_template
52+
setup_ilm if ilm_enabled?
53+
end
5054
end
5155
end
5256

@@ -114,7 +118,6 @@ def maximum_seen_major_version
114118
client.maximum_seen_major_version
115119
end
116120

117-
118121
def routing_field_name
119122
maximum_seen_major_version >= 6 ? :routing : :_routing
120123
end
@@ -353,4 +356,4 @@ def dlq_enabled?
353356
!execution_context.dlq_writer.inner_writer.is_a?(::LogStash::Util::DummyDeadLetterQueueWriter)
354357
end
355358
end
356-
end; end; end
359+
end end end

lib/logstash/outputs/elasticsearch/common_configs.rb

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
module LogStash; module Outputs; class ElasticSearch
44
module CommonConfigs
5+
6+
DEFAULT_INDEX_NAME = "logstash-%{+YYYY.MM.dd}"
7+
DEFAULT_POLICY = "logstash-policy"
8+
59
def self.included(mod)
610
# The index to write events to. This can be dynamic using the `%{foo}` syntax.
711
# The default value will partition your indices by day so you can more easily
@@ -10,7 +14,7 @@ def self.included(mod)
1014
# For weekly indexes ISO 8601 format is recommended, eg. logstash-%{+xxxx.ww}.
1115
# LS uses Joda to format the index pattern from event timestamp.
1216
# Joda formats are defined http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html[here].
13-
mod.config :index, :validate => :string, :default => "logstash-%{+YYYY.MM.dd}"
17+
mod.config :index, :validate => :string, :default => DEFAULT_INDEX_NAME
1418

1519
mod.config :document_type,
1620
:validate => :string,
@@ -136,6 +140,24 @@ def self.included(mod)
136140
# Set which ingest pipeline you wish to execute for an event. You can also use event dependent configuration
137141
# here like `pipeline => "%{INGEST_PIPELINE}"`
138142
mod.config :pipeline, :validate => :string, :default => nil
143+
144+
145+
# -----
146+
# ILM configurations (beta)
147+
# -----
148+
# Flag for enabling Index Lifecycle Management integration.
149+
mod.config :ilm_enabled, :validate => :boolean, :default => false
150+
151+
# Rollover alias used for indexing data. If rollover alias doesn't exist, Logstash will create it and map it to the relevant index
152+
mod.config :ilm_rollover_alias, :validate => :string, :default => 'logstash'
153+
154+
# appends “{now/d}-000001” by default for new index creation, subsequent rollover indices will increment based on this pattern i.e. “000002”
155+
# {now/d} is date math, and will insert the appropriate value automatically.
156+
mod.config :ilm_pattern, :validate => :string, :default => '{now/d}-000001'
157+
158+
# ILM policy to use, if undefined the default policy will be used.
159+
mod.config :ilm_policy, :validate => :string, :default => DEFAULT_POLICY
160+
139161
end
140162
end
141163
end end end
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
{
2+
"policy" : {
3+
"phases": {
4+
"hot" : {
5+
"actions" : {
6+
"rollover" : {
7+
"max_size" : "50gb",
8+
"max_age":"30d"
9+
}
10+
}
11+
}
12+
}
13+
}
14+
}

lib/logstash/outputs/elasticsearch/http_client.rb

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -337,17 +337,62 @@ def host_to_url(h)
337337
::LogStash::Util::SafeURI.new(raw_url)
338338
end
339339

340-
def template_exists?(name)
341-
response = @pool.head("/_template/#{name}")
340+
def exists?(path, use_get=false)
341+
response = use_get ? @pool.get(path) : @pool.head(path)
342342
response.code >= 200 && response.code <= 299
343343
end
344344

345+
def template_exists?(name)
346+
exists?("/_template/#{name}")
347+
end
348+
345349
def template_put(name, template)
346350
path = "_template/#{name}"
347351
logger.info("Installing elasticsearch template to #{path}")
348352
@pool.put(path, nil, LogStash::Json.dump(template))
349353
end
350354

355+
# ILM methods
356+
357+
# check whether rollover alias already exists
358+
def rollover_alias_exists?(name)
359+
exists?(name)
360+
end
361+
362+
# Create a new rollover alias
363+
def rollover_alias_put(alias_name, alias_definition)
364+
logger.info("Creating rollover alias #{alias_name}")
365+
begin
366+
@pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition))
367+
# If the rollover alias already exists, ignore the error that comes back from Elasticsearch
368+
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
369+
if e.response_code == 400
370+
logger.info("Rollover Alias #{alias_name} already exists. Skipping")
371+
return
372+
end
373+
raise e
374+
end
375+
end
376+
377+
def get_xpack_info
378+
get("/_xpack")
379+
end
380+
381+
def get_ilm_endpoint
382+
@pool.get("/_ilm/policy")
383+
end
384+
385+
def ilm_policy_exists?(name)
386+
exists?("/_ilm/policy/#{name}", true)
387+
end
388+
389+
def ilm_policy_put(name, policy)
390+
path = "_ilm/policy/#{name}"
391+
logger.info("Installing ILM policy #{policy} to #{path}")
392+
@pool.put(path, nil, LogStash::Json.dump(policy))
393+
end
394+
395+
351396
# Build a bulk item for an elasticsearch update action
352397
def update_action_builder(args, source)
353398
if args[:_script]

lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,9 @@ def format_url(url, path_and_query=nil)
103103
end
104104

105105
request_uri.query = new_query_parts.join("&") unless new_query_parts.empty?
106-
107-
request_uri.path = "#{request_uri.path}/#{parsed_path_and_query.path}".gsub(/\/{2,}/, "/")
108-
106+
107+
request_uri.path = "#{request_uri.path}/#{parsed_path_and_query.raw_path}".gsub(/\/{2,}/, "/")
108+
109109
request_uri
110110
end
111111

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
module LogStash; module Outputs; class ElasticSearch
2+
module Ilm
3+
4+
ILM_POLICY_PATH = "default-ilm-policy.json"
5+
6+
def setup_ilm
7+
return unless ilm_enabled?
8+
@logger.info("Using Index lifecycle management - this feature is currently in beta.")
9+
@logger.warn "Overwriting supplied index name with rollover alias #{@ilm_rollover_alias}" if @index != LogStash::Outputs::ElasticSearch::CommonConfigs::DEFAULT_INDEX_NAME
10+
@index = ilm_rollover_alias
11+
12+
maybe_create_rollover_alias
13+
maybe_create_ilm_policy
14+
end
15+
16+
def ilm_enabled?
17+
@ilm_enabled
18+
end
19+
20+
def verify_ilm_readiness
21+
return unless ilm_enabled?
22+
23+
# Check the Elasticsearch instance for ILM readiness - this means that the version has to be a non-OSS release, with ILM feature
24+
# available and enabled.
25+
begin
26+
xpack = client.get_xpack_info
27+
features = xpack["features"]
28+
ilm = features.nil? ? nil : features["ilm"]
29+
raise LogStash::ConfigurationError, "Index Lifecycle management is enabled in logstash, but not installed on your Elasticsearch cluster" if features.nil? || ilm.nil?
30+
raise LogStash::ConfigurationError, "Index Lifecycle management is enabled in logstash, but not available in your Elasticsearch cluster" unless ilm['available']
31+
raise LogStash::ConfigurationError, "Index Lifecycle management is enabled in logstash, but not enabled in your Elasticsearch cluster" unless ilm['enabled']
32+
33+
unless ilm_policy_default? || client.ilm_policy_exists?(ilm_policy)
34+
raise LogStash::ConfigurationError, "The specified ILM policy #{ilm_policy} does not exist on your Elasticsearch instance"
35+
end
36+
37+
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
38+
# Check xpack endpoint: If no xpack endpoint, then this version of Elasticsearch is not compatible
39+
if e.response_code == 404
40+
raise LogStash::ConfigurationError, "Index Lifecycle management is enabled in logstash, but not installed on your Elasticsearch cluster"
41+
elsif e.response_code == 400
42+
raise LogStash::ConfigurationError, "Index Lifecycle management is enabled in logstash, but not installed on your Elasticsearch cluster"
43+
else
44+
raise e
45+
end
46+
end
47+
end
48+
49+
private
50+
51+
def ilm_policy_default?
52+
ilm_policy == LogStash::Outputs::ElasticSearch::DEFAULT_POLICY
53+
end
54+
55+
def maybe_create_ilm_policy
56+
if ilm_policy_default? && !client.ilm_policy_exists?(ilm_policy)
57+
client.ilm_policy_put(ilm_policy, policy_payload)
58+
end
59+
end
60+
61+
def maybe_create_rollover_alias
62+
client.rollover_alias_put(rollover_alias_target, rollover_alias_payload) unless client.rollover_alias_exists?(ilm_rollover_alias)
63+
end
64+
65+
def rollover_alias_target
66+
"<#{ilm_rollover_alias}-#{ilm_pattern}>"
67+
end
68+
69+
def rollover_alias_payload
70+
{
71+
'aliases' => {
72+
ilm_rollover_alias =>{
73+
'is_write_index' => true
74+
}
75+
}
76+
}
77+
end
78+
79+
def policy_payload
80+
policy_path = ::File.expand_path(ILM_POLICY_PATH, ::File.dirname(__FILE__))
81+
LogStash::Json.load(::IO.read(policy_path))
82+
end
83+
end
84+
end end end

lib/logstash/outputs/elasticsearch/template_manager.rb

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ def self.install_template(plugin)
55
return unless plugin.manage_template
66
plugin.logger.info("Using mapping template from", :path => plugin.template)
77
template = get_template(plugin.template, plugin.maximum_seen_major_version)
8+
add_ilm_settings_to_template(plugin, template) if plugin.ilm_enabled?
89
plugin.logger.info("Attempting to install template", :manage_template => template)
9-
install(plugin.client, plugin.template_name, template, plugin.template_overwrite)
10+
install(plugin.client, template_name(plugin), template, plugin.template_overwrite)
1011
rescue => e
1112
plugin.logger.error("Failed to install template.", :message => e.message, :class => e.class.name, :backtrace => e.backtrace)
1213
end
@@ -21,6 +22,25 @@ def self.install(client, template_name, template, template_overwrite)
2122
client.template_install(template_name, template, template_overwrite)
2223
end
2324

25+
def self.add_ilm_settings_to_template(plugin, template)
26+
plugin.logger.info("Overwriting index patterns, as ILM is enabled.")
27+
# Overwrite any index patterns, and use the rollover alias. Use 'index_patterns' rather than 'template' for pattern
28+
# definition - remove any existing definition of 'template'
29+
template.delete('template') if template.include?('template')
30+
template['index_patterns'] = "#{plugin.ilm_rollover_alias}-*"
31+
if template['settings'] && (template['settings']['index.lifecycle.name'] || template['settings']['index.lifecycle.rollover_alias'])
32+
plugin.logger.info("Overwriting index lifecycle name and rollover alias as ILM is enabled.")
33+
end
34+
template['settings'].update({ 'index.lifecycle.name' => plugin.ilm_policy, 'index.lifecycle.rollover_alias' => plugin.ilm_rollover_alias})
35+
end
36+
37+
# Template name - if template_name set, use it
38+
# if not and ILM is enabled, use the rollover alias
39+
# else use the default value of template_name
40+
def self.template_name(plugin)
41+
plugin.ilm_enabled? && !plugin.original_params.key?('template_name') ? plugin.ilm_rollover_alias : plugin.template_name
42+
end
43+
2444
def self.default_template_path(es_major_version)
2545
template_version = es_major_version == 1 ? 2 : es_major_version
2646
default_template_name = "elasticsearch-template-es#{template_version}x.json"

0 commit comments

Comments
 (0)