From dde8922094d5de905d74770fa125c2d942021304 Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 25 Oct 2023 14:44:00 +0200 Subject: [PATCH 01/11] Added baseline test to cover integration fields replacement --- spec/unit/outputs/elasticsearch_spec.rb | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index 56df89ab..0be27d00 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -271,6 +271,24 @@ end end + describe "with event integration metadata" do + context "when user doesn't specify index setting and the event present an index field in metadata" do + let(:event) { LogStash::Event.new({"[@metadata][_ingest_document][index]" => "meta-document-index"}) } + + it "use the index provided by the integration" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "meta-document-index") + end + end + + context "when user doesn't specify document_id setting and the event's contains one" do + let(:event) { LogStash::Event.new({"[@metadata][_ingest_document][id]" => "meta-document-id"}) } + + it "use the _id provided by the integration" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_id => "meta-document-id") + end + end + end + describe "with auth" do let(:user) { "myuser" } let(:password) { ::LogStash::Util::Password.new("mypassword") } From 1002ee499f5dab2eeac2724a4bd8faf99527ee68 Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 25 Oct 2023 15:37:36 +0200 Subject: [PATCH 02/11] Refactoring: extracted index and document resolution methods --- lib/logstash/outputs/elasticsearch.rb | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index 897ffcf3..a564b440 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -540,10 +540,9 @@ def initialize(bad_action) # @return Hash (initial) parameters for given event # @private shared event params factory between index and data_stream mode def common_event_params(event) - sprintf_index = @event_target.call(event) - raise IndexInterpolationError, sprintf_index if sprintf_index.match(/%{.*?}/) && dlq_on_failed_indexname_interpolation + sprintf_index = resolve_index!(event) params = { - :_id => @document_id ? event.sprintf(@document_id) : nil, + :_id => resolve_document_id(event), :_index => sprintf_index, routing_field_name => @routing ? event.sprintf(@routing) : nil } @@ -559,6 +558,18 @@ def common_event_params(event) params end + def resolve_document_id(event) + @document_id ? event.sprintf(@document_id) : nil + end + private :resolve_document_id + + def resolve_index!(event) + sprintf_index = @event_target.call(event) + raise IndexInterpolationError, sprintf_index if sprintf_index.match(/%{.*?}/) && dlq_on_failed_indexname_interpolation + sprintf_index + end + private :resolve_index! + def resolve_pipeline(event) pipeline_template = @pipeline || event.get("[@metadata][target_ingest_pipeline]")&.to_s pipeline_template && event.sprintf(pipeline_template) From 69f17bcfea5d77492dfe2b2c685ed569f2cbcee7 Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 25 Oct 2023 17:59:34 +0200 Subject: [PATCH 03/11] Changed event_target provider closure to use _ingest_document metadata during the creation of id and index for each event --- lib/logstash/outputs/elasticsearch.rb | 21 ++++++++++----- spec/unit/outputs/elasticsearch_spec.rb | 35 ++++++++++++++++++++----- 2 files changed, 43 insertions(+), 13 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index a564b440..6e8de699 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -540,10 +540,12 @@ def initialize(bad_action) # @return Hash (initial) parameters for given event # @private shared event params factory between index and data_stream mode def common_event_params(event) - sprintf_index = resolve_index!(event) + event_control = event.get("[@metadata][_ingest_document]") + event_id, event_pipeline, event_index = event_control&.values_at("id","pipeline","index") rescue nil + params = { - :_id => resolve_document_id(event), - :_index => sprintf_index, + :_id => resolve_document_id(event, event_id), + :_index => resolve_index!(event, event_index), routing_field_name => @routing ? event.sprintf(@routing) : nil } @@ -558,15 +560,20 @@ def common_event_params(event) params end - def resolve_document_id(event) - @document_id ? event.sprintf(@document_id) : nil + def resolve_document_id(event, event_id) + return event.sprintf(@document_id) if @document_id + return event_id || nil end private :resolve_document_id - def resolve_index!(event) + def resolve_index!(event, event_index) sprintf_index = @event_target.call(event) raise IndexInterpolationError, sprintf_index if sprintf_index.match(/%{.*?}/) && dlq_on_failed_indexname_interpolation - sprintf_index + # if its not a data stream, sprintf_index is the @index with resolved placeholders. + # if its a data stream, sprintf_index could be either the name of a data stream or the value contained in + # @index without placeholders substitution. In any case if event's metadata index is provided takes precedence + # on datastream name or whaterver is returned by the event_target provider. + return event_index || sprintf_index end private :resolve_index! diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index 0be27d00..a04f8739 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -272,19 +272,42 @@ end describe "with event integration metadata" do - context "when user doesn't specify index setting and the event present an index field in metadata" do - let(:event) { LogStash::Event.new({"[@metadata][_ingest_document][index]" => "meta-document-index"}) } + context "when user doesn't specify index setting and the event contains an index field in metadata" do + let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"index" => "meta-document-index"}}}) } it "use the index provided by the integration" do expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "meta-document-index") end end - context "when user doesn't specify document_id setting and the event's contains one" do - let(:event) { LogStash::Event.new({"[@metadata][_ingest_document][id]" => "meta-document-id"}) } + context "when datastream is provided" do +# let(:event) { LogStash::Event.new({"data_stream" => {"type" => "logs", "dataset" => "generic", "namespace" => "default"}}) } - it "use the _id provided by the integration" do - expect(subject.send(:event_action_tuple, event)[1]).to include(:_id => "meta-document-id") + context "event contains an index field in metadata" do + let(:event) { LogStash::Event.new({"data_stream" => {"type" => "logs", "dataset" => "generic", "namespace" => "default"}, + "@metadata" => {"_ingest_document" => {"index" => "meta-document-index"}}}) } + + it "use the index provided by the integration" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "meta-document-index") + end + end + end + + context "when user doesn't specify document_id setting" do + context "event's contains one" do + let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"id" => "meta-document-id"}}}) } + + it "use the _id provided by the integration" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_id => "meta-document-id") + end + end + + context "event doesn't contains one" do + let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {}}}) } + + it "use the _id provided by the integration" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_id => nil) + end end end end From 24ef84d7da17a2111097f518d8d565fc91f580f3 Mon Sep 17 00:00:00 2001 From: andsel Date: Thu, 26 Oct 2023 17:23:38 +0200 Subject: [PATCH 04/11] Use pipeline name provided from integration execution if present in metadata --- lib/logstash/outputs/elasticsearch.rb | 5 +++-- spec/unit/outputs/elasticsearch_spec.rb | 18 ++++++++++++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index 6e8de699..895fffca 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -549,7 +549,7 @@ def common_event_params(event) routing_field_name => @routing ? event.sprintf(@routing) : nil } - target_pipeline = resolve_pipeline(event) + target_pipeline = resolve_pipeline(event, event_pipeline) # convention: empty string equates to not using a pipeline # this is useful when using a field reference in the pipeline setting, e.g. # elasticsearch { @@ -577,7 +577,8 @@ def resolve_index!(event, event_index) end private :resolve_index! - def resolve_pipeline(event) + def resolve_pipeline(event, event_pipeline) + return event_pipeline if event_pipeline && !@pipeline pipeline_template = @pipeline || event.get("[@metadata][target_ingest_pipeline]")&.to_s pipeline_template && event.sprintf(pipeline_template) end diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index a04f8739..bf470cdf 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -792,6 +792,24 @@ expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "my-ingest-pipeline") end + context "when event contains also _ingest_document pipeline name" do + let(:event) { LogStash::Event.new({"pipeline" => "my-ingest-pipeline", + "@metadata" => {"target_ingest_pipeline" => "meta-ingest-pipeline", + "_ingest_document" => {"pipeline" => "integration-pipeline"}}}) } + + it "the one provided by user takes precedence on all the others" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "my-ingest-pipeline") + end + + context "when settings doesn't configure a pipeline and integration provides one in the event" do + let(:options) { { } } + + it "the one provided by user takes precedence on all the others" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "integration-pipeline") + end + end + end + context "when the plugin's `pipeline` is constant" do let(:options) { super().merge("pipeline" => "my-constant-pipeline") } it "uses plugin's pipeline value" do From 2f58a53a3d8249578d9ef31516c0a525113183ba Mon Sep 17 00:00:00 2001 From: andsel Date: Fri, 27 Oct 2023 12:08:52 +0200 Subject: [PATCH 05/11] [Test] Rephrased test descriptions --- lib/logstash/outputs/elasticsearch.rb | 8 ++++---- spec/unit/outputs/elasticsearch_spec.rb | 24 +++++++++++------------- 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index 895fffca..f0c53db4 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -569,10 +569,10 @@ def resolve_document_id(event, event_id) def resolve_index!(event, event_index) sprintf_index = @event_target.call(event) raise IndexInterpolationError, sprintf_index if sprintf_index.match(/%{.*?}/) && dlq_on_failed_indexname_interpolation - # if its not a data stream, sprintf_index is the @index with resolved placeholders. - # if its a data stream, sprintf_index could be either the name of a data stream or the value contained in - # @index without placeholders substitution. In any case if event's metadata index is provided takes precedence - # on datastream name or whaterver is returned by the event_target provider. + # if it's not a data stream, sprintf_index is the @index with resolved placeholders. + # if is a data stream, sprintf_index could be either the name of a data stream or the value contained in + # @index without placeholders substitution. If event's metadata index is provided, it takes precedence + # on datastream name or whatever is returned by the event_target provider. return event_index || sprintf_index end private :resolve_index! diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index bf470cdf..0d6390c3 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -272,40 +272,38 @@ end describe "with event integration metadata" do - context "when user doesn't specify index setting and the event contains an index field in metadata" do + context "when there isn't any index setting specified and the event contains an integration metadata index" do let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"index" => "meta-document-index"}}}) } - it "use the index provided by the integration" do + it "precedence is given to the integration" do expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "meta-document-index") end end - context "when datastream is provided" do -# let(:event) { LogStash::Event.new({"data_stream" => {"type" => "logs", "dataset" => "generic", "namespace" => "default"}}) } - - context "event contains an index field in metadata" do + context "when datastream is used" do + context "event contains an integration metadata index" do let(:event) { LogStash::Event.new({"data_stream" => {"type" => "logs", "dataset" => "generic", "namespace" => "default"}, "@metadata" => {"_ingest_document" => {"index" => "meta-document-index"}}}) } - it "use the index provided by the integration" do + it "precedence is given to the integration" do expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "meta-document-index") end end end - context "when user doesn't specify document_id setting" do - context "event's contains one" do + context "when there isn't any document_id setting" do + context "event contains an integration metadata id" do let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"id" => "meta-document-id"}}}) } - it "use the _id provided by the integration" do + it "precedence is given to the integration" do expect(subject.send(:event_action_tuple, event)[1]).to include(:_id => "meta-document-id") end end - context "event doesn't contains one" do + context "event doesn't contain an integration metadata id" do let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {}}}) } - it "use the _id provided by the integration" do + it "let Elasticsearch to assign one" do expect(subject.send(:event_action_tuple, event)[1]).to include(:_id => nil) end end @@ -797,7 +795,7 @@ "@metadata" => {"target_ingest_pipeline" => "meta-ingest-pipeline", "_ingest_document" => {"pipeline" => "integration-pipeline"}}}) } - it "the one provided by user takes precedence on all the others" do + it "precedence is given to the integration" do expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "my-ingest-pipeline") end From dc74110732a0ced0abbdaba33f7fa819f8d0eb97 Mon Sep 17 00:00:00 2001 From: andsel Date: Mon, 6 Nov 2023 14:12:42 +0100 Subject: [PATCH 06/11] Reshaped unit test for index replacement, to cover all cases --- spec/unit/outputs/elasticsearch_spec.rb | 70 +++++++++++++++++++++---- 1 file changed, 60 insertions(+), 10 deletions(-) diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index 0d6390c3..e6365460 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -272,22 +272,72 @@ end describe "with event integration metadata" do - context "when there isn't any index setting specified and the event contains an integration metadata index" do - let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"index" => "meta-document-index"}}}) } + let(:event_fields) {{}} + let(:event) { LogStash::Event.new(event_fields)} - it "precedence is given to the integration" do - expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "meta-document-index") + context "when plugin's index is specified" do + let(:options) { super().merge("index" => "index_from_settings")} + + context "when the event contains an integration metadata index" do + let(:event_fields) { super().merge({"@metadata" => {"_ingest_document" => {"index" => "meta-document-index"}}}) } + + it "plugin's index is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "index_from_settings") + end end - end - context "when datastream is used" do - context "event contains an integration metadata index" do - let(:event) { LogStash::Event.new({"data_stream" => {"type" => "logs", "dataset" => "generic", "namespace" => "default"}, - "@metadata" => {"_ingest_document" => {"index" => "meta-document-index"}}}) } + context "when the event doesn't contains an integration metadata index" do + it "plugin's index is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "index_from_settings") + end + end + end - it "precedence is given to the integration" do + context "when plugin's index is NOT specified" do + let(:options) { super().merge("index" => nil)} + + context "when the event contains an integration metadata index" do + let(:event_fields) { super().merge({"@metadata" => {"_ingest_document" => {"index" => "meta-document-index"}}}) } + + it "plugin's configuration metadata index is used" do expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "meta-document-index") end + + context "when datastream settings are NOT configured" do + it "plugin's configuration metadata index is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "meta-document-index") + end + end + + context "when datastream settings are configured" do + let(:event_fields) { super().merge({"data_stream" => {"type" => "logs", "dataset" => "generic", "namespace" => "default"}}) } + + it "plugin's configuration metadata index is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "meta-document-index") + end + end + end + + context "when the event DOESN'T contain integration metadata index" do + let(:default_index_resolved) { event.sprintf(subject.default_index) } + + it "default index is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => default_index_resolved) + end + + context "when datastream settings are NOT configured" do + it "default index is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => default_index_resolved) + end + end + + context "when datastream settings are configured" do + let(:event_fields) { super().merge({"data_stream" => {"type" => "logs", "dataset" => "generic", "namespace" => "default"}}) } + + it "default index is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => default_index_resolved) + end + end end end From cebba1a4b5d5dfcb7dbb455116d9d019e01c2cad Mon Sep 17 00:00:00 2001 From: andsel Date: Mon, 6 Nov 2023 14:53:42 +0100 Subject: [PATCH 07/11] Fixed failing tests that proove the precedence to user setttings for index --- lib/logstash/outputs/elasticsearch.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index f0c53db4..213ba7d1 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -573,7 +573,8 @@ def resolve_index!(event, event_index) # if is a data stream, sprintf_index could be either the name of a data stream or the value contained in # @index without placeholders substitution. If event's metadata index is provided, it takes precedence # on datastream name or whatever is returned by the event_target provider. - return event_index || sprintf_index + return event_index if @index == @default_index && event_index + return sprintf_index end private :resolve_index! From 967557288350e456a0bd0f1513775df69a429562 Mon Sep 17 00:00:00 2001 From: andsel Date: Mon, 6 Nov 2023 15:36:48 +0100 Subject: [PATCH 08/11] Reshaped tests for document_id to cover all cases --- spec/unit/outputs/elasticsearch_spec.rb | 33 +++++++++++++++++++------ 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index e6365460..c169c589 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -341,24 +341,41 @@ end end - context "when there isn't any document_id setting" do - context "event contains an integration metadata id" do + context "when plugin's document_id is specified" do + let(:options) { super().merge("document_id" => "id_from_settings")} + + context "when the event contains an integration metadata document_id" do let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"id" => "meta-document-id"}}}) } - it "precedence is given to the integration" do - expect(subject.send(:event_action_tuple, event)[1]).to include(:_id => "meta-document-id") + it "plugin's document_id is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_id => "id_from_settings") end end - context "event doesn't contain an integration metadata id" do - let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {}}}) } + context "when the event DOESN'T contains an integration metadata document_id" do + it "plugin's document_id is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_id => "id_from_settings") + end + end + end + + context "when plugin's document_id is NOT specified" do + let(:options) { super().merge("document_id" => nil)} - it "let Elasticsearch to assign one" do + context "when the event contains an integration metadata document_id" do + let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"id" => "meta-document-id"}}}) } + + it "plugin's configuration metadata document_id is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_id => "meta-document-id") + end + end + + context "when the event DOESN'T contains an integration metadata document_id" do + it "plugin's default id mechanism is used" do expect(subject.send(:event_action_tuple, event)[1]).to include(:_id => nil) end end end - end describe "with auth" do let(:user) { "myuser" } From 3e8ec2d6209c964723a96f35acb6374fc369c239 Mon Sep 17 00:00:00 2001 From: andsel Date: Mon, 6 Nov 2023 16:42:30 +0100 Subject: [PATCH 09/11] Added all cases for metadata's ingest pipeline --- spec/unit/outputs/elasticsearch_spec.rb | 37 +++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index c169c589..c20287f6 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -377,6 +377,43 @@ end end + context "when plugin's pipeline is specified" do + let(:options) { {"pipeline" => "pipeline_from_settings" } } + + context "when the event contains an integration metadata pipeline" do + let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"pipeline" => "integration-pipeline"}}}) } + + it "plugin's pipeline is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "pipeline_from_settings") + end + end + + context "when the event DOESN'T contains an integration metadata pipeline" do + it "plugin's pipeline is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "pipeline_from_settings") + end + end + end + + context "when plugin's pipeline is NOT specified" do + let(:options) { super().merge("pipeline" => nil)} + + context "when the event contains an integration metadata pipeline" do + let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"pipeline" => "integration-pipeline"}}}) } + + it "event's metadata pipeline is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "integration-pipeline") + end + end + + context "when the event DOESN'T contains an integration metadata pipeline" do + it "plugin's default pipeline mechanism is used" do + expect(subject.send(:event_action_tuple, event)[1]).to_not have_key(:pipeline) + end + end + end + end + describe "with auth" do let(:user) { "myuser" } let(:password) { ::LogStash::Util::Password.new("mypassword") } From 87c13cecf4a762328e55e92d7976383ac3a876e7 Mon Sep 17 00:00:00 2001 From: andsel Date: Mon, 6 Nov 2023 17:45:29 +0100 Subject: [PATCH 10/11] Moved test about target_ingest_pipeline and ingest_document into the proper context --- spec/unit/outputs/elasticsearch_spec.rb | 37 ++++++++++--------------- 1 file changed, 14 insertions(+), 23 deletions(-) diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index c20287f6..f7be612a 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -299,12 +299,12 @@ context "when the event contains an integration metadata index" do let(:event_fields) { super().merge({"@metadata" => {"_ingest_document" => {"index" => "meta-document-index"}}}) } - it "plugin's configuration metadata index is used" do + it "event's metadata index is used" do expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "meta-document-index") end context "when datastream settings are NOT configured" do - it "plugin's configuration metadata index is used" do + it "event's metadata index is used" do expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "meta-document-index") end end @@ -312,7 +312,7 @@ context "when datastream settings are configured" do let(:event_fields) { super().merge({"data_stream" => {"type" => "logs", "dataset" => "generic", "namespace" => "default"}}) } - it "plugin's configuration metadata index is used" do + it "event's metadata index is used" do expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "meta-document-index") end end @@ -365,7 +365,7 @@ context "when the event contains an integration metadata document_id" do let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"id" => "meta-document-id"}}}) } - it "plugin's configuration metadata document_id is used" do + it "event's metadata document_id is used" do expect(subject.send(:event_action_tuple, event)[1]).to include(:_id => "meta-document-id") end end @@ -399,11 +399,20 @@ let(:options) { super().merge("pipeline" => nil)} context "when the event contains an integration metadata pipeline" do - let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"pipeline" => "integration-pipeline"}}}) } + let(:metadata) { {"_ingest_document" => {"pipeline" => "integration-pipeline"}} } + let(:event) { LogStash::Event.new({"@metadata" => metadata}) } it "event's metadata pipeline is used" do expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "integration-pipeline") end + + context "when also target_ingest_pipeline id defined" do + let(:metadata) { super().merge({"target_ingest_pipeline" => "meta-ingest-pipeline"}) } + + it "then event's pipeline from _ingest_document is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "integration-pipeline") + end + end end context "when the event DOESN'T contains an integration metadata pipeline" do @@ -894,24 +903,6 @@ expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "my-ingest-pipeline") end - context "when event contains also _ingest_document pipeline name" do - let(:event) { LogStash::Event.new({"pipeline" => "my-ingest-pipeline", - "@metadata" => {"target_ingest_pipeline" => "meta-ingest-pipeline", - "_ingest_document" => {"pipeline" => "integration-pipeline"}}}) } - - it "precedence is given to the integration" do - expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "my-ingest-pipeline") - end - - context "when settings doesn't configure a pipeline and integration provides one in the event" do - let(:options) { { } } - - it "the one provided by user takes precedence on all the others" do - expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "integration-pipeline") - end - end - end - context "when the plugin's `pipeline` is constant" do let(:options) { super().merge("pipeline" => "my-constant-pipeline") } it "uses plugin's pipeline value" do From 583af0e279a7470a05d251371d4e7c0b80dff847 Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 8 Nov 2023 12:21:09 +0100 Subject: [PATCH 11/11] Bumped version --- CHANGELOG.md | 3 +++ logstash-output-elasticsearch.gemspec | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c4e265b5..e030c862 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 11.21.0 + - Added support for propagating event processing metadata when this output is downstream of an Elastic Integration Filter and configured _without_ explicit `index`, `document_id`, or `pipeline` directives [#1155](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1155) + ## 11.20.1 - Doc: Replace `document_already_exist_exception` with `version_conflict_engine_exception` in the `silence_errors_in_log` setting example [#1159](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1159) diff --git a/logstash-output-elasticsearch.gemspec b/logstash-output-elasticsearch.gemspec index 3dd9b918..3d0cae46 100644 --- a/logstash-output-elasticsearch.gemspec +++ b/logstash-output-elasticsearch.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-elasticsearch' - s.version = '11.20.1' + s.version = '11.21.0' s.licenses = ['apache-2.0'] s.summary = "Stores logs in Elasticsearch" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"