Skip to content

Commit a052928

Browse files
committed
✨ Add use_metadata option
1 parent 7f1099e commit a052928

File tree

5 files changed

+188
-6
lines changed

5 files changed

+188
-6
lines changed

docs/index.asciidoc

+23
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,7 @@ This plugin supports the following configuration options plus the
367367
| <<plugins-{type}s-{plugin}-truststore_password>> |<<password,password>>|No
368368
| <<plugins-{type}s-{plugin}-upsert>> |<<string,string>>|No
369369
| <<plugins-{type}s-{plugin}-user>> |<<string,string>>|No
370+
| <<plugins-{type}s-{plugin}-use_metadata>> |<<boolean,boolean>>|No
370371
| <<plugins-{type}s-{plugin}-validate_after_inactivity>> |<<number,number>>|No
371372
| <<plugins-{type}s-{plugin}-version>> |<<string,string>>|No
372373
| <<plugins-{type}s-{plugin}-version_type>> |<<string,string>>, one of `["internal", "external", "external_gt", "external_gte", "force"]`|No
@@ -1095,6 +1096,28 @@ Create a new document with this parameter as json string if `document_id` doesn'
10951096

10961097
Username to authenticate to a secure Elasticsearch cluster
10971098

1099+
[id="plugins-{type}s-{plugin}-use_metadata"]
1100+
===== `use_metadata`
1101+
1102+
* Value type is <<boolean,boolean>>
1103+
* Default value is `false`
1104+
1105+
Use and preference output parameters defined in the document metadata. The <<plugins-{type}s-{plugin}-index>> (`@metadata._index`), <<plugins-{type}s-{plugin}-document_id>> (`@metadata._id_`), and <<plugins-{type}s-{plugin}-pipeline>> (`@metadata.pipeline`) can be set by their respective `@metadata` fields.
1106+
1107+
E.g. to index a document to index `myindex` with id `myid` with the ingest pipeline `mypipeline`:
1108+
1109+
[source,json]
1110+
-----
1111+
{
1112+
"message": "foo",
1113+
"@metadata": {
1114+
"_index": "myindex",
1115+
"_id": "myid",
1116+
"pipeline": "mypipeline"
1117+
}
1118+
}
1119+
-----
1120+
10981121
[id="plugins-{type}s-{plugin}-validate_after_inactivity"]
10991122
===== `validate_after_inactivity`
11001123

lib/logstash/outputs/elasticsearch.rb

+19-4
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,12 @@
7575
#
7676
# ==== HTTP Compression
7777
#
78-
# This plugin supports request and response compression. Response compression is enabled by default and
79-
# for Elasticsearch versions 5.0 and later, the user doesn't have to set any configs in Elasticsearch for
80-
# it to send back compressed response. For versions before 5.0, `http.compression` must be set to `true` in
78+
# This plugin supports request and response compression. Response compression is enabled by default and
79+
# for Elasticsearch versions 5.0 and later, the user doesn't have to set any configs in Elasticsearch for
80+
# it to send back compressed response. For versions before 5.0, `http.compression` must be set to `true` in
8181
# Elasticsearch[https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-http.html#modules-http] to take advantage of response compression when using this plugin
8282
#
83-
# For requests compression, regardless of the Elasticsearch version, users have to enable `http_compression`
83+
# For requests compression, regardless of the Elasticsearch version, users have to enable `http_compression`
8484
# setting in their Logstash config file.
8585
#
8686
class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
@@ -251,6 +251,9 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
251251
# ILM policy to use, if undefined the default policy will be used.
252252
config :ilm_policy, :validate => :string, :default => DEFAULT_POLICY
253253

254+
# ILM policy to use, if undefined the default policy will be used.
255+
config :use_metadata, :validate => :boolean, :default => false
256+
254257
attr_reader :client
255258
attr_reader :default_index
256259
attr_reader :default_ilm_rollover_alias
@@ -419,13 +422,25 @@ def common_event_params(event)
419422
}
420423

421424
if @pipeline
425+
logger.debug("Pipeline params BEFORE", params: params)
422426
value = event.sprintf(@pipeline)
423427
# convention: empty string equates to not using a pipeline
424428
# this is useful when using a field reference in the pipeline setting, e.g.
425429
# elasticsearch {
426430
# pipeline => "%{[@metadata][pipeline]}"
427431
# }
428432
params[:pipeline] = value unless value.empty?
433+
logger.debug("Pipeline params AFTER", params: params)
434+
end
435+
436+
if @use_metadata
437+
logger.debug("@metadata params BEFORE", params: params)
438+
params[:_id] = event.get("[@metadata][_id]") || params[:_id]
439+
event_index = event.get("[@metadata][_index]")
440+
params[:_index] = event.sprintf(event_index) if event_index && !event_index.empty?
441+
event_pipeline = event.get("[@metadata][pipeline]")
442+
params[:pipeline] = event.sprintf(event_pipeline) if event_pipeline && !event_pipeline.empty?
443+
logger.debug("@metadata params AFTER", params: params)
429444
end
430445

431446
params

spec/integration/outputs/index_spec.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
end
4747

4848
describe "indexing" do
49-
let(:event) { LogStash::Event.new("message" => "Hello World!", "type" => type) }
49+
let(:event) { LogStash::Event.new("message" => "Hello World!", "type" => type, "@metadata" => { "_id" => "test-id", "_index" => "test-index", "pipeline" => "test-pipeline" }) }
5050
let(:index) { 10.times.collect { rand(10).to_s }.join("") }
5151
let(:type) { ESHelper.es_version_satisfies?("< 7") ? "doc" : "_doc" }
5252
let(:event_count) { 1 + rand(2) }

spec/integration/outputs/index_version_spec.rb

+27
Original file line numberDiff line numberDiff line change
@@ -94,5 +94,32 @@
9494
expect(r2["_source"]["message"]).to eq('foo')
9595
end
9696
end
97+
98+
describe "use metadata" do
99+
let(:settings) do
100+
{
101+
"index" => "logstash-index",
102+
"hosts" => get_host_port(),
103+
"use_metadata" => true,
104+
}
105+
end
106+
107+
it "should use @metadata._id for document_id" do
108+
id = "new_doc_id_1"
109+
subject.multi_receive([LogStash::Event.new("@metadata" => { "_id" => id }, "message" => "foo")])
110+
r = es.get(:index => "logstash-index", :type => doc_type, :id => id, :refresh => true)
111+
expect(r["_id"]).to eq(id)
112+
expect(r["_source"]["message"]).to eq("foo")
113+
end
114+
it "should use @metadata._index for index" do
115+
id = "new_doc_id_2"
116+
new_index = "logstash-index-new"
117+
subject.multi_receive([LogStash::Event.new("@metadata" => { "_id" => id, "_index" => new_index }, "message" => "foo")])
118+
r = es.get(:index => new_index, :type => doc_type, :id => id, :refresh => true)
119+
expect(r["_id"]).to eq(id)
120+
expect(r["_index"]).to eq(new_index)
121+
expect(r["_source"]["message"]).to eq("foo")
122+
end
123+
end
97124
end
98125
end

spec/integration/outputs/ingest_pipeline_spec.rb

+118-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
end
6161

6262
it "indexes using the proper pipeline" do
63-
results = @es.search(:index => 'logstash-*', :q => "message:\"netcat\"")
63+
results = @es.search(:index => "logstash-*", :q => "message:\"netcat\"")
6464
expect(results).to have_hits(1)
6565
expect(results["hits"]["hits"][0]["_source"]["response"]).to eq("200")
6666
expect(results["hits"]["hits"][0]["_source"]["bytes"]).to eq("182")
@@ -72,3 +72,120 @@
7272
expect(results["hits"]["hits"][0]["_source"]["junkfieldaaaa"]).to eq(nil)
7373
end
7474
end
75+
76+
describe "Ingest pipeline from metadata", :integration => true do
77+
subject! do
78+
require "logstash/outputs/elasticsearch"
79+
settings = {
80+
"hosts" => "#{get_host_port()}",
81+
"pipeline" => "apache-logs",
82+
"data_stream" => "false",
83+
"use_metadata" => true,
84+
}
85+
next LogStash::Outputs::ElasticSearch.new(settings)
86+
end
87+
88+
let(:http_client) { Manticore::Client.new }
89+
let(:ingest_url) { "http://#{get_host_port()}/_ingest/pipeline/apache-logs" }
90+
let(:apache_logs_pipeline) {
91+
'
92+
{
93+
"description" : "Pipeline to parse Apache logs",
94+
"processors" : [
95+
{
96+
"grok": {
97+
"field": "message",
98+
"patterns": ["%{COMBINEDAPACHELOG}"]
99+
}
100+
}
101+
]
102+
}'
103+
}
104+
105+
let(:add_field_ingest_url) { "http://#{get_host_port()}/_ingest/pipeline/add-field" }
106+
let(:add_field_logs_pipeline) {
107+
'
108+
{
109+
"description": "Add field foo with value bar",
110+
"processors": [
111+
{
112+
"set": {
113+
"field": "foo",
114+
"value": "bar"
115+
}
116+
}
117+
]
118+
}'
119+
}
120+
121+
before :each do
122+
# Delete all templates first.
123+
require "elasticsearch"
124+
125+
# Clean ES of data before we start.
126+
@es = get_client
127+
@es.indices.delete_template(:name => "*")
128+
129+
# This can fail if there are no indexes, ignore failure.
130+
@es.indices.delete(:index => "*") rescue nil
131+
132+
# delete existing ingest pipeline
133+
http_client.delete(ingest_url).call
134+
135+
# register pipelines
136+
http_client.put(ingest_url, :body => apache_logs_pipeline, :headers => { "Content-Type" => "application/json" }).call
137+
http_client.put(add_field_ingest_url, :body => add_field_logs_pipeline, :headers => { "Content-Type" => "application/json" }).call
138+
139+
#TODO: Use esclient
140+
#@es.ingest.put_pipeline :id => 'apache_pipeline', :body => pipeline_defintion
141+
142+
subject.register
143+
subject.multi_receive([
144+
LogStash::Event.new("message" => '183.60.215.50 - - [01/Jun/2015:18:00:00 +0000] "GET /scripts/netcat-webserver HTTP/1.1" 200 182 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"'),
145+
LogStash::Event.new("message" => '183.60.215.50 - - [01/Jun/2015:18:00:00 +0000] "GET /scripts/netcat-webserver HTTP/1.1" 200 182 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"', "@metadata" => { "_id" => "id1", "_index" => "index1", "pipeline" => "add-field" }),
146+
LogStash::Event.new("message" => '183.60.215.50 - - [01/Jun/2015:18:00:00 +0000] "GET /scripts/netcat-webserver HTTP/1.1" 200 182 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"', "@metadata" => { "_id" => "id2", "_index" => "index2", "pipeline" => "" }),
147+
])
148+
@es.indices.refresh
149+
150+
#Wait or fail until everything's indexed.
151+
Stud::try(10.times) do
152+
r = @es.search(index: "logstash-*")
153+
expect(r).to have_hits(1)
154+
r = @es.search(index: "index1")
155+
expect(r).to have_hits(1)
156+
r = @es.search(index: "index2")
157+
expect(r).to have_hits(1)
158+
sleep(0.1)
159+
end
160+
end
161+
162+
it "indexes using the correct pipeline when @metadata.pipeline not defined" do
163+
results = @es.search(:index => "logstash-*", :q => "message:\"netcat\"")
164+
expect(results).to have_hits(1)
165+
expect(results["hits"]["hits"][0]["_source"]["response"]).to eq("200")
166+
expect(results["hits"]["hits"][0]["_source"]["bytes"]).to eq("182")
167+
expect(results["hits"]["hits"][0]["_source"]["verb"]).to eq("GET")
168+
expect(results["hits"]["hits"][0]["_source"]["request"]).to eq("/scripts/netcat-webserver")
169+
expect(results["hits"]["hits"][0]["_source"]["auth"]).to eq("-")
170+
expect(results["hits"]["hits"][0]["_source"]["ident"]).to eq("-")
171+
expect(results["hits"]["hits"][0]["_source"]["clientip"]).to eq("183.60.215.50")
172+
expect(results["hits"]["hits"][0]["_source"]["junkfieldaaaa"]).to eq(nil)
173+
end
174+
175+
it "indexes using the @metadata._index, @metadata._id, and @metadata.pipeline when defined" do
176+
results = @es.search(:index => "index1", :q => "message:\"netcat\"")
177+
expect(results).to have_hits(1)
178+
expect(results["hits"]["hits"][0]["_id"]).to eq("id1")
179+
expect(results["hits"]["hits"][0]["_index"]).to eq("index1")
180+
expect(results["hits"]["hits"][0]["_source"]["foo"]).to eq("bar")
181+
end
182+
183+
it "indexes ignore empty @metadata.pipeline values" do
184+
results = @es.search(:index => "index2", :q => "message:\"netcat\"")
185+
expect(results).to have_hits(1)
186+
expect(results["hits"]["hits"][0]["_id"]).to eq("id2")
187+
expect(results["hits"]["hits"][0]["_index"]).to eq("index2")
188+
expect(results["hits"]["hits"][0]["_source"]).not_to include("foo")
189+
end
190+
191+
end

0 commit comments

Comments
 (0)