From f0606c5867aef516ca5c663b7f74c37d51cbbccb Mon Sep 17 00:00:00 2001 From: andsel Date: Thu, 26 Oct 2023 17:23:38 +0200 Subject: [PATCH 01/15] Use pipeline name provided from integration execution if present in metadata --- spec/unit/outputs/elasticsearch_spec.rb | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index f7be612a..1d0ae7fd 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -903,6 +903,24 @@ expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "my-ingest-pipeline") end + context "when event contains also _ingest_document pipeline name" do + let(:event) { LogStash::Event.new({"pipeline" => "my-ingest-pipeline", + "@metadata" => {"target_ingest_pipeline" => "meta-ingest-pipeline", + "_ingest_document" => {"pipeline" => "integration-pipeline"}}}) } + + it "the one provided by user takes precedence on all the others" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "my-ingest-pipeline") + end + + context "when settings doesn't configure a pipeline and integration provides one in the event" do + let(:options) { { } } + + it "the one provided by user takes precedence on all the others" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "integration-pipeline") + end + end + end + context "when the plugin's `pipeline` is constant" do let(:options) { super().merge("pipeline" => "my-constant-pipeline") } it "uses plugin's pipeline value" do From 2677982a0516ae2dbd4e6cd5c41a265880b265a4 Mon Sep 17 00:00:00 2001 From: andsel Date: Fri, 27 Oct 2023 12:08:52 +0200 Subject: [PATCH 02/15] [Test] Rephrased test descriptions --- spec/unit/outputs/elasticsearch_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index 1d0ae7fd..c6e42690 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -908,7 +908,7 @@ "@metadata" => {"target_ingest_pipeline" => "meta-ingest-pipeline", "_ingest_document" => {"pipeline" => "integration-pipeline"}}}) } - it "the one provided by user takes precedence on all the others" do + it "precedence is given to the integration" do expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "my-ingest-pipeline") end From 47181d75a1b170ceb782251d913cfb4a2ec4a1a1 Mon Sep 17 00:00:00 2001 From: andsel Date: Mon, 6 Nov 2023 14:12:42 +0100 Subject: [PATCH 03/15] Reshaped unit test for index replacement, to cover all cases --- spec/unit/outputs/elasticsearch_spec.rb | 47 +++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index c6e42690..66c39275 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -291,6 +291,53 @@ expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "index_from_settings") end end + + context "when plugin's index is NOT specified" do + let(:options) { super().merge("index" => nil)} + + context "when the event contains an integration metadata index" do + let(:event_fields) { super().merge({"@metadata" => {"_ingest_document" => {"index" => "meta-document-index"}}}) } + + it "plugin's configuration metadata index is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "meta-document-index") + end + + context "when datastream settings are NOT configured" do + it "plugin's configuration metadata index is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "meta-document-index") + end + end + + context "when datastream settings are configured" do + let(:event_fields) { super().merge({"data_stream" => {"type" => "logs", "dataset" => "generic", "namespace" => "default"}}) } + + it "plugin's configuration metadata index is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "meta-document-index") + end + end + end + + context "when the event DOESN'T contain integration metadata index" do + let(:default_index_resolved) { event.sprintf(subject.default_index) } + + it "default index is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => default_index_resolved) + end + + context "when datastream settings are NOT configured" do + it "default index is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => default_index_resolved) + end + end + + context "when datastream settings are configured" do + let(:event_fields) { super().merge({"data_stream" => {"type" => "logs", "dataset" => "generic", "namespace" => "default"}}) } + + it "default index is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => default_index_resolved) + end + end + end end context "when plugin's index is NOT specified" do From 56b359c85278c749b4be4adda475f5a748c8d74b Mon Sep 17 00:00:00 2001 From: andsel Date: Mon, 6 Nov 2023 17:45:29 +0100 Subject: [PATCH 04/15] Moved test about target_ingest_pipeline and ingest_document into the proper context --- spec/unit/outputs/elasticsearch_spec.rb | 24 +++--------------------- 1 file changed, 3 insertions(+), 21 deletions(-) diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index 66c39275..89d7f27c 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -298,12 +298,12 @@ context "when the event contains an integration metadata index" do let(:event_fields) { super().merge({"@metadata" => {"_ingest_document" => {"index" => "meta-document-index"}}}) } - it "plugin's configuration metadata index is used" do + it "event's metadata index is used" do expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "meta-document-index") end context "when datastream settings are NOT configured" do - it "plugin's configuration metadata index is used" do + it "event's metadata index is used" do expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "meta-document-index") end end @@ -311,7 +311,7 @@ context "when datastream settings are configured" do let(:event_fields) { super().merge({"data_stream" => {"type" => "logs", "dataset" => "generic", "namespace" => "default"}}) } - it "plugin's configuration metadata index is used" do + it "event's metadata index is used" do expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "meta-document-index") end end @@ -950,24 +950,6 @@ expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "my-ingest-pipeline") end - context "when event contains also _ingest_document pipeline name" do - let(:event) { LogStash::Event.new({"pipeline" => "my-ingest-pipeline", - "@metadata" => {"target_ingest_pipeline" => "meta-ingest-pipeline", - "_ingest_document" => {"pipeline" => "integration-pipeline"}}}) } - - it "precedence is given to the integration" do - expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "my-ingest-pipeline") - end - - context "when settings doesn't configure a pipeline and integration provides one in the event" do - let(:options) { { } } - - it "the one provided by user takes precedence on all the others" do - expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "integration-pipeline") - end - end - end - context "when the plugin's `pipeline` is constant" do let(:options) { super().merge("pipeline" => "my-constant-pipeline") } it "uses plugin's pipeline value" do From e50c2081734c4d393d305eefb55e57cd0dc13f1d Mon Sep 17 00:00:00 2001 From: andsel Date: Fri, 3 Nov 2023 11:44:24 +0100 Subject: [PATCH 05/15] Extracted routing param resolution in separate method --- lib/logstash/outputs/elasticsearch.rb | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index 213ba7d1..cc2e581a 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -541,12 +541,12 @@ def initialize(bad_action) # @private shared event params factory between index and data_stream mode def common_event_params(event) event_control = event.get("[@metadata][_ingest_document]") - event_id, event_pipeline, event_index = event_control&.values_at("id","pipeline","index") rescue nil + event_id, event_pipeline, event_index = event_control&.values_at("id","pipeline","index", "routing") rescue nil params = { :_id => resolve_document_id(event, event_id), :_index => resolve_index!(event, event_index), - routing_field_name => @routing ? event.sprintf(@routing) : nil + routing_field_name => resolve_routing(event) } target_pipeline = resolve_pipeline(event, event_pipeline) @@ -560,6 +560,11 @@ def common_event_params(event) params end + def resolve_routing(event) + @routing ? event.sprintf(@routing) : nil + end + private :resolve_routing + def resolve_document_id(event, event_id) return event.sprintf(@document_id) if @document_id return event_id || nil From 3e5924d4c848b9d75ce46f9bf3dd2e2b9dcea849 Mon Sep 17 00:00:00 2001 From: andsel Date: Fri, 3 Nov 2023 13:54:35 +0100 Subject: [PATCH 06/15] Use routing setting from ingest_document metadata, if present --- lib/logstash/outputs/elasticsearch.rb | 7 ++++--- spec/unit/outputs/elasticsearch_spec.rb | 19 +++++++++++++++++++ 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index cc2e581a..08ffaf13 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -541,12 +541,12 @@ def initialize(bad_action) # @private shared event params factory between index and data_stream mode def common_event_params(event) event_control = event.get("[@metadata][_ingest_document]") - event_id, event_pipeline, event_index = event_control&.values_at("id","pipeline","index", "routing") rescue nil + event_id, event_pipeline, event_index, event_routing = event_control&.values_at("id","pipeline","index", "routing") rescue nil params = { :_id => resolve_document_id(event, event_id), :_index => resolve_index!(event, event_index), - routing_field_name => resolve_routing(event) + routing_field_name => resolve_routing(event, event_routing) } target_pipeline = resolve_pipeline(event, event_pipeline) @@ -560,7 +560,8 @@ def common_event_params(event) params end - def resolve_routing(event) + def resolve_routing(event, event_routing) + return event_routing if event_routing && !@routing @routing ? event.sprintf(@routing) : nil end private :resolve_routing diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index 89d7f27c..11a3c6f8 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -275,6 +275,25 @@ let(:event_fields) {{}} let(:event) { LogStash::Event.new(event_fields)} + context "which contains routing field in its metadata" do + # defines an event with routing in the integration metadata section + let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"routing" => "meta-document-routing"}}}) } + + context "when routing is specified in plugin settings" do + let(:options) { super().merge("routing" => "settings_routing")} + + it "takes precedence over the integration one" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:routing => "settings_routing") + end + end + + context "when routing is not defined in plugin settings" do + it "must use the value from the integration's metadata" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:routing => "meta-document-routing") + end + end + end + context "when plugin's index is specified" do let(:options) { super().merge("index" => "index_from_settings")} From e6eed9f6db9afa7c1ae29e5e7d6ebedf06fc25be Mon Sep 17 00:00:00 2001 From: andsel Date: Fri, 3 Nov 2023 14:54:46 +0100 Subject: [PATCH 07/15] Moved the assignment of version and version_type into common_event_params method --- lib/logstash/outputs/elasticsearch.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index 08ffaf13..2f737b4e 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -499,9 +499,6 @@ def event_action_tuple(event) params[retry_on_conflict_action_name] = @retry_on_conflict end - params[:version] = event.sprintf(@version) if @version - params[:version_type] = event.sprintf(@version_type) if @version_type - EventActionTuple.new(action, params, event) end @@ -557,6 +554,9 @@ def common_event_params(event) # } params[:pipeline] = target_pipeline unless (target_pipeline.nil? || target_pipeline.empty?) + params[:version] = event.sprintf(@version) if @version + params[:version_type] = event.sprintf(@version_type) if @version_type + params end From 7560c51b5e34a873e6de1e6c0e273263c7899f59 Mon Sep 17 00:00:00 2001 From: andsel Date: Fri, 3 Nov 2023 15:21:00 +0100 Subject: [PATCH 08/15] Use version setting from ingest_document metadata, if present --- lib/logstash/outputs/elasticsearch.rb | 10 ++++++++-- spec/unit/outputs/elasticsearch_spec.rb | 24 ++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index 2f737b4e..c3c268b0 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -538,7 +538,7 @@ def initialize(bad_action) # @private shared event params factory between index and data_stream mode def common_event_params(event) event_control = event.get("[@metadata][_ingest_document]") - event_id, event_pipeline, event_index, event_routing = event_control&.values_at("id","pipeline","index", "routing") rescue nil + event_id, event_pipeline, event_index, event_routing, event_version = event_control&.values_at("id","pipeline","index", "routing", "version") rescue nil params = { :_id => resolve_document_id(event, event_id), @@ -554,12 +554,18 @@ def common_event_params(event) # } params[:pipeline] = target_pipeline unless (target_pipeline.nil? || target_pipeline.empty?) - params[:version] = event.sprintf(@version) if @version + params[:version] = resolve_version(event, event_version) params[:version_type] = event.sprintf(@version_type) if @version_type params end + def resolve_version(event, event_version) + return event_version if event_version && !@version + event.sprintf(@version) if @version + end + private :resolve_version + def resolve_routing(event, event_routing) return event_routing if event_routing && !@routing @routing ? event.sprintf(@routing) : nil diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index 11a3c6f8..a6b30006 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -275,6 +275,25 @@ let(:event_fields) {{}} let(:event) { LogStash::Event.new(event_fields)} + context "which contains version field" do + # defines an event with version in the integration metadata section + let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"version" => "456"}}}) } + + context "when version is also specified in plugin settings" do + let(:options) { super().merge("version" => "123")} + + it "takes precedence over the integration one" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:version => "123") + end + end + + context "when version is not defined in plugin settings" do + it "must use the value from the integration's metadata" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:version => "456") + end + end + end + context "which contains routing field in its metadata" do # defines an event with routing in the integration metadata section let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"routing" => "meta-document-routing"}}}) } @@ -294,11 +313,16 @@ end end +<<<<<<< HEAD context "when plugin's index is specified" do let(:options) { super().merge("index" => "index_from_settings")} context "when the event contains an integration metadata index" do let(:event_fields) { super().merge({"@metadata" => {"_ingest_document" => {"index" => "meta-document-index"}}}) } +======= + context "when there isn't any index setting specified and the event contains an integration metadata index" do + let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"index" => "meta-document-index"}}}) } +>>>>>>> 332e6e3 (Use version setting from ingest_document metadata, if present) it "plugin's index is used" do expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "index_from_settings") From 1d137a5a53b568baf407ff01fb881372d2226095 Mon Sep 17 00:00:00 2001 From: andsel Date: Fri, 3 Nov 2023 15:28:01 +0100 Subject: [PATCH 09/15] Use version_type setting from ingest_document metadata, if present --- lib/logstash/outputs/elasticsearch.rb | 10 ++++++++-- spec/unit/outputs/elasticsearch_spec.rb | 19 +++++++++++++++++++ 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index c3c268b0..8ac4e33c 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -538,7 +538,7 @@ def initialize(bad_action) # @private shared event params factory between index and data_stream mode def common_event_params(event) event_control = event.get("[@metadata][_ingest_document]") - event_id, event_pipeline, event_index, event_routing, event_version = event_control&.values_at("id","pipeline","index", "routing", "version") rescue nil + event_id, event_pipeline, event_index, event_routing, event_version, event_version_type = event_control&.values_at("id","pipeline","index", "routing", "version", "version_type") rescue nil params = { :_id => resolve_document_id(event, event_id), @@ -555,7 +555,7 @@ def common_event_params(event) params[:pipeline] = target_pipeline unless (target_pipeline.nil? || target_pipeline.empty?) params[:version] = resolve_version(event, event_version) - params[:version_type] = event.sprintf(@version_type) if @version_type + params[:version_type] = resolve_version_type(event, event_version_type) params end @@ -566,6 +566,12 @@ def resolve_version(event, event_version) end private :resolve_version + def resolve_version_type(event, event_version_type) + return event_version_type if event_version_type && !@version_type + event.sprintf(@version_type) if @version_type + end + private :resolve_version_type + def resolve_routing(event, event_routing) return event_routing if event_routing && !@routing @routing ? event.sprintf(@routing) : nil diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index a6b30006..b3eccd15 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -294,6 +294,25 @@ end end + context "which contains version_type field" do + # defines an event with version_type in the integration metadata section + let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"version_type" => "external"}}}) } + + context "when version_type is also specified in plugin settings" do + let(:options) { super().merge("version_type" => "internal")} + + it "takes precedence over the integration one" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:version_type => "internal") + end + end + + context "when version_type is not defined in plugin settings" do + it "must use the value from the integration's metadata" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:version_type => "external") + end + end + end + context "which contains routing field in its metadata" do # defines an event with routing in the integration metadata section let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"routing" => "meta-document-routing"}}}) } From e947f0691c1c797a4fa27ea6ac1cfa7cbdb64abc Mon Sep 17 00:00:00 2001 From: andsel Date: Tue, 7 Nov 2023 09:11:53 +0100 Subject: [PATCH 10/15] Removed badly resolved merge commit --- spec/unit/outputs/elasticsearch_spec.rb | 5 ----- 1 file changed, 5 deletions(-) diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index b3eccd15..77c7281c 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -332,16 +332,11 @@ end end -<<<<<<< HEAD context "when plugin's index is specified" do let(:options) { super().merge("index" => "index_from_settings")} context "when the event contains an integration metadata index" do let(:event_fields) { super().merge({"@metadata" => {"_ingest_document" => {"index" => "meta-document-index"}}}) } -======= - context "when there isn't any index setting specified and the event contains an integration metadata index" do - let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"index" => "meta-document-index"}}}) } ->>>>>>> 332e6e3 (Use version setting from ingest_document metadata, if present) it "plugin's index is used" do expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "index_from_settings") From 17dfff73ce340bfcc541c6774a6cbb3edfb4cdbb Mon Sep 17 00:00:00 2001 From: andsel Date: Tue, 7 Nov 2023 09:54:14 +0100 Subject: [PATCH 11/15] Extended test suite to cover all the cases as in originating PR --- spec/unit/outputs/elasticsearch_spec.rb | 93 ++++++++++++++++++------- 1 file changed, 69 insertions(+), 24 deletions(-) diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index 77c7281c..da2cfc56 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -275,61 +275,106 @@ let(:event_fields) {{}} let(:event) { LogStash::Event.new(event_fields)} - context "which contains version field" do - # defines an event with version in the integration metadata section - let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"version" => "456"}}}) } + context "when plugin's version is specified" do + let(:options) { super().merge("version" => "123")} - context "when version is also specified in plugin settings" do - let(:options) { super().merge("version" => "123")} + context "when the event contains an integration metadata version" do + let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"version" => "456"}}}) } - it "takes precedence over the integration one" do + it "plugin's version is used" do expect(subject.send(:event_action_tuple, event)[1]).to include(:version => "123") end end - context "when version is not defined in plugin settings" do - it "must use the value from the integration's metadata" do + context "when the event DOESN'T contains an integration metadata version" do + it "plugin's version is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:version => "123") + end + end + end + + context "when plugin's version is NOT specified" do + context "when the event contains an integration metadata version" do + let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"version" => "456"}}}) } + + it "event's metadata version is used" do expect(subject.send(:event_action_tuple, event)[1]).to include(:version => "456") end end + + context "when the event DOESN'T contain an integration metadata version" do + it "plugin's default id mechanism is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:version => nil) + end + end end - context "which contains version_type field" do - # defines an event with version_type in the integration metadata section - let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"version_type" => "external"}}}) } + context "when plugin's version_type is specified" do + let(:options) { super().merge("version_type" => "internal")} - context "when version_type is also specified in plugin settings" do - let(:options) { super().merge("version_type" => "internal")} + context "when the event contains an integration metadata version_type" do + let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"version_type" => "external"}}}) } - it "takes precedence over the integration one" do + it "plugin's version_type is used" do expect(subject.send(:event_action_tuple, event)[1]).to include(:version_type => "internal") end end - context "when version_type is not defined in plugin settings" do - it "must use the value from the integration's metadata" do + context "when the event DOESN'T contains an integration metadata version_type" do + it "plugin's version_type is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:version_type => "internal") + end + end + end + + context "when plugin's version_type is NOT specified" do + context "when the event contains an integration metadata version_type" do + let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"version_type" => "external"}}}) } + + it "event's metadata version_type is used" do expect(subject.send(:event_action_tuple, event)[1]).to include(:version_type => "external") end end + + context "when the event DOESN'T contain an integration metadata version_type" do + it "plugin's default id mechanism is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:version_type => nil) + end + end end - context "which contains routing field in its metadata" do - # defines an event with routing in the integration metadata section - let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"routing" => "meta-document-routing"}}}) } + context "when plugin's routing is specified" do + let(:options) { super().merge("routing" => "settings_routing")} - context "when routing is specified in plugin settings" do - let(:options) { super().merge("routing" => "settings_routing")} + context "when the event contains an integration metadata routing" do + let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"routing" => "meta-document-routing"}}}) } - it "takes precedence over the integration one" do + it "plugin's routing is used" do expect(subject.send(:event_action_tuple, event)[1]).to include(:routing => "settings_routing") end end - context "when routing is not defined in plugin settings" do - it "must use the value from the integration's metadata" do + context "when the event DOESN'T contains an integration metadata routing" do + it "plugin's routing is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:routing => "settings_routing") + end + end + end + + context "when plugin's routing is NOT specified" do + context "when the event contains an integration metadata routing" do + let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"routing" => "meta-document-routing"}}}) } + + it "event's metadata routing is used" do expect(subject.send(:event_action_tuple, event)[1]).to include(:routing => "meta-document-routing") end end + + context "when the event DOESN'T contain an integration metadata routing" do + it "plugin's default id mechanism is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:routing => nil) + end + end end context "when plugin's index is specified" do From ed367e46616aef15a07074edaef7dd9f1777d98c Mon Sep 17 00:00:00 2001 From: andsel Date: Fri, 10 Nov 2023 10:33:38 +0100 Subject: [PATCH 12/15] Removed duplication of tests --- spec/unit/outputs/elasticsearch_spec.rb | 47 ------------------------- 1 file changed, 47 deletions(-) diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index da2cfc56..807bb2b2 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -393,53 +393,6 @@ expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "index_from_settings") end end - - context "when plugin's index is NOT specified" do - let(:options) { super().merge("index" => nil)} - - context "when the event contains an integration metadata index" do - let(:event_fields) { super().merge({"@metadata" => {"_ingest_document" => {"index" => "meta-document-index"}}}) } - - it "event's metadata index is used" do - expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "meta-document-index") - end - - context "when datastream settings are NOT configured" do - it "event's metadata index is used" do - expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "meta-document-index") - end - end - - context "when datastream settings are configured" do - let(:event_fields) { super().merge({"data_stream" => {"type" => "logs", "dataset" => "generic", "namespace" => "default"}}) } - - it "event's metadata index is used" do - expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "meta-document-index") - end - end - end - - context "when the event DOESN'T contain integration metadata index" do - let(:default_index_resolved) { event.sprintf(subject.default_index) } - - it "default index is used" do - expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => default_index_resolved) - end - - context "when datastream settings are NOT configured" do - it "default index is used" do - expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => default_index_resolved) - end - end - - context "when datastream settings are configured" do - let(:event_fields) { super().merge({"data_stream" => {"type" => "logs", "dataset" => "generic", "namespace" => "default"}}) } - - it "default index is used" do - expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => default_index_resolved) - end - end - end end context "when plugin's index is NOT specified" do From 6508c0ed4d3a4957fc943dd9335777c3d45e430b Mon Sep 17 00:00:00 2001 From: andsel Date: Fri, 10 Nov 2023 10:47:05 +0100 Subject: [PATCH 13/15] [Skip CI] Bumped version --- CHANGELOG.md | 3 +++ logstash-output-elasticsearch.gemspec | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e030c862..36b797be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 11.22.0 + - Added support for propagating event processing metadata when this output is downstream of an Elastic Integration Filter and configured _without_ explicit `version`, `version_type`, or `routing` directives [#1158](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1158) + ## 11.21.0 - Added support for propagating event processing metadata when this output is downstream of an Elastic Integration Filter and configured _without_ explicit `index`, `document_id`, or `pipeline` directives [#1155](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1155) diff --git a/logstash-output-elasticsearch.gemspec b/logstash-output-elasticsearch.gemspec index 3d0cae46..2c4e6b04 100644 --- a/logstash-output-elasticsearch.gemspec +++ b/logstash-output-elasticsearch.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-elasticsearch' - s.version = '11.21.0' + s.version = '11.22.0' s.licenses = ['apache-2.0'] s.summary = "Stores logs in Elasticsearch" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" From cc45a0a0fcc30c8ed7473456766d7bda14625783 Mon Sep 17 00:00:00 2001 From: Andrea Selva Date: Thu, 16 Nov 2023 09:12:06 +0100 Subject: [PATCH 14/15] Avoid to store nil values for version_type, version and routing Adding `version => nil` is a change in behaviour because, actually it's never added, so this commit respect the existing logic, do not add event's keys that are empty Co-authored-by: Ry Biesemeyer --- lib/logstash/outputs/elasticsearch.rb | 3 ++- spec/unit/outputs/elasticsearch_spec.rb | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index 8ac4e33c..81fd7229 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -557,7 +557,8 @@ def common_event_params(event) params[:version] = resolve_version(event, event_version) params[:version_type] = resolve_version_type(event, event_version_type) - params + # purge nil valued key-value pairs + params.compact end def resolve_version(event, event_version) diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index 807bb2b2..7e9a4d62 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -304,7 +304,7 @@ context "when the event DOESN'T contain an integration metadata version" do it "plugin's default id mechanism is used" do - expect(subject.send(:event_action_tuple, event)[1]).to include(:version => nil) + expect(subject.send(:event_action_tuple, event)[1]).to_not include(:version) end end end @@ -338,7 +338,7 @@ context "when the event DOESN'T contain an integration metadata version_type" do it "plugin's default id mechanism is used" do - expect(subject.send(:event_action_tuple, event)[1]).to include(:version_type => nil) + expect(subject.send(:event_action_tuple, event)[1]).to_not include(:version_type) end end end @@ -372,7 +372,7 @@ context "when the event DOESN'T contain an integration metadata routing" do it "plugin's default id mechanism is used" do - expect(subject.send(:event_action_tuple, event)[1]).to include(:routing => nil) + expect(subject.send(:event_action_tuple, event)[1]).to_not include(:routing) end end end From e91210c084d6e494e7df3d7011743947d6205b73 Mon Sep 17 00:00:00 2001 From: andsel Date: Thu, 16 Nov 2023 11:03:53 +0100 Subject: [PATCH 15/15] The existing behavior is to keep routing => nil in event fields. This commit rollback the changes to satisfy that condition --- lib/logstash/outputs/elasticsearch.rb | 10 ++++++---- spec/unit/outputs/elasticsearch_spec.rb | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index 81fd7229..598265d9 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -554,11 +554,13 @@ def common_event_params(event) # } params[:pipeline] = target_pipeline unless (target_pipeline.nil? || target_pipeline.empty?) - params[:version] = resolve_version(event, event_version) - params[:version_type] = resolve_version_type(event, event_version_type) + resolved_version = resolve_version(event, event_version) + resolved_version_type = resolve_version_type(event, event_version_type) + # avoid to add nil valued key-value pairs + params[:version] = resolved_version unless resolved_version.nil? + params[:version_type] = resolved_version_type unless resolved_version_type.nil? - # purge nil valued key-value pairs - params.compact + params end def resolve_version(event, event_version) diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index 7e9a4d62..171acd92 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -372,7 +372,7 @@ context "when the event DOESN'T contain an integration metadata routing" do it "plugin's default id mechanism is used" do - expect(subject.send(:event_action_tuple, event)[1]).to_not include(:routing) + expect(subject.send(:event_action_tuple, event)[1]).to include(:routing => nil) end end end