diff --git a/CHANGELOG.md b/CHANGELOG.md index c4e265b5..e030c862 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 11.21.0 + - Added support for propagating event processing metadata when this output is downstream of an Elastic Integration Filter and configured _without_ explicit `index`, `document_id`, or `pipeline` directives [#1155](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1155) + ## 11.20.1 - Doc: Replace `document_already_exist_exception` with `version_conflict_engine_exception` in the `silence_errors_in_log` setting example [#1159](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1159) diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index 897ffcf3..213ba7d1 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -540,15 +540,16 @@ def initialize(bad_action) # @return Hash (initial) parameters for given event # @private shared event params factory between index and data_stream mode def common_event_params(event) - sprintf_index = @event_target.call(event) - raise IndexInterpolationError, sprintf_index if sprintf_index.match(/%{.*?}/) && dlq_on_failed_indexname_interpolation + event_control = event.get("[@metadata][_ingest_document]") + event_id, event_pipeline, event_index = event_control&.values_at("id","pipeline","index") rescue nil + params = { - :_id => @document_id ? event.sprintf(@document_id) : nil, - :_index => sprintf_index, + :_id => resolve_document_id(event, event_id), + :_index => resolve_index!(event, event_index), routing_field_name => @routing ? event.sprintf(@routing) : nil } - target_pipeline = resolve_pipeline(event) + target_pipeline = resolve_pipeline(event, event_pipeline) # convention: empty string equates to not using a pipeline # this is useful when using a field reference in the pipeline setting, e.g. # elasticsearch { @@ -559,7 +560,26 @@ def common_event_params(event) params end - def resolve_pipeline(event) + def resolve_document_id(event, event_id) + return event.sprintf(@document_id) if @document_id + return event_id || nil + end + private :resolve_document_id + + def resolve_index!(event, event_index) + sprintf_index = @event_target.call(event) + raise IndexInterpolationError, sprintf_index if sprintf_index.match(/%{.*?}/) && dlq_on_failed_indexname_interpolation + # if it's not a data stream, sprintf_index is the @index with resolved placeholders. + # if is a data stream, sprintf_index could be either the name of a data stream or the value contained in + # @index without placeholders substitution. If event's metadata index is provided, it takes precedence + # on datastream name or whatever is returned by the event_target provider. + return event_index if @index == @default_index && event_index + return sprintf_index + end + private :resolve_index! + + def resolve_pipeline(event, event_pipeline) + return event_pipeline if event_pipeline && !@pipeline pipeline_template = @pipeline || event.get("[@metadata][target_ingest_pipeline]")&.to_s pipeline_template && event.sprintf(pipeline_template) end diff --git a/logstash-output-elasticsearch.gemspec b/logstash-output-elasticsearch.gemspec index 3dd9b918..3d0cae46 100644 --- a/logstash-output-elasticsearch.gemspec +++ b/logstash-output-elasticsearch.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-elasticsearch' - s.version = '11.20.1' + s.version = '11.21.0' s.licenses = ['apache-2.0'] s.summary = "Stores logs in Elasticsearch" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index 56df89ab..f7be612a 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -271,6 +271,158 @@ end end + describe "with event integration metadata" do + let(:event_fields) {{}} + let(:event) { LogStash::Event.new(event_fields)} + + context "when plugin's index is specified" do + let(:options) { super().merge("index" => "index_from_settings")} + + context "when the event contains an integration metadata index" do + let(:event_fields) { super().merge({"@metadata" => {"_ingest_document" => {"index" => "meta-document-index"}}}) } + + it "plugin's index is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "index_from_settings") + end + end + + context "when the event doesn't contains an integration metadata index" do + it "plugin's index is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "index_from_settings") + end + end + end + + context "when plugin's index is NOT specified" do + let(:options) { super().merge("index" => nil)} + + context "when the event contains an integration metadata index" do + let(:event_fields) { super().merge({"@metadata" => {"_ingest_document" => {"index" => "meta-document-index"}}}) } + + it "event's metadata index is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "meta-document-index") + end + + context "when datastream settings are NOT configured" do + it "event's metadata index is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "meta-document-index") + end + end + + context "when datastream settings are configured" do + let(:event_fields) { super().merge({"data_stream" => {"type" => "logs", "dataset" => "generic", "namespace" => "default"}}) } + + it "event's metadata index is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "meta-document-index") + end + end + end + + context "when the event DOESN'T contain integration metadata index" do + let(:default_index_resolved) { event.sprintf(subject.default_index) } + + it "default index is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => default_index_resolved) + end + + context "when datastream settings are NOT configured" do + it "default index is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => default_index_resolved) + end + end + + context "when datastream settings are configured" do + let(:event_fields) { super().merge({"data_stream" => {"type" => "logs", "dataset" => "generic", "namespace" => "default"}}) } + + it "default index is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => default_index_resolved) + end + end + end + end + + context "when plugin's document_id is specified" do + let(:options) { super().merge("document_id" => "id_from_settings")} + + context "when the event contains an integration metadata document_id" do + let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"id" => "meta-document-id"}}}) } + + it "plugin's document_id is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_id => "id_from_settings") + end + end + + context "when the event DOESN'T contains an integration metadata document_id" do + it "plugin's document_id is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_id => "id_from_settings") + end + end + end + + context "when plugin's document_id is NOT specified" do + let(:options) { super().merge("document_id" => nil)} + + context "when the event contains an integration metadata document_id" do + let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"id" => "meta-document-id"}}}) } + + it "event's metadata document_id is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_id => "meta-document-id") + end + end + + context "when the event DOESN'T contains an integration metadata document_id" do + it "plugin's default id mechanism is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:_id => nil) + end + end + end + + context "when plugin's pipeline is specified" do + let(:options) { {"pipeline" => "pipeline_from_settings" } } + + context "when the event contains an integration metadata pipeline" do + let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"pipeline" => "integration-pipeline"}}}) } + + it "plugin's pipeline is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "pipeline_from_settings") + end + end + + context "when the event DOESN'T contains an integration metadata pipeline" do + it "plugin's pipeline is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "pipeline_from_settings") + end + end + end + + context "when plugin's pipeline is NOT specified" do + let(:options) { super().merge("pipeline" => nil)} + + context "when the event contains an integration metadata pipeline" do + let(:metadata) { {"_ingest_document" => {"pipeline" => "integration-pipeline"}} } + let(:event) { LogStash::Event.new({"@metadata" => metadata}) } + + it "event's metadata pipeline is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "integration-pipeline") + end + + context "when also target_ingest_pipeline id defined" do + let(:metadata) { super().merge({"target_ingest_pipeline" => "meta-ingest-pipeline"}) } + + it "then event's pipeline from _ingest_document is used" do + expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "integration-pipeline") + end + end + end + + context "when the event DOESN'T contains an integration metadata pipeline" do + it "plugin's default pipeline mechanism is used" do + expect(subject.send(:event_action_tuple, event)[1]).to_not have_key(:pipeline) + end + end + end + end + describe "with auth" do let(:user) { "myuser" } let(:password) { ::LogStash::Util::Password.new("mypassword") }