diff --git a/.ci/elasticsearch-run.sh b/.ci/elasticsearch-run.sh index 9df8c88d8..c27070763 100755 --- a/.ci/elasticsearch-run.sh +++ b/.ci/elasticsearch-run.sh @@ -1,4 +1,4 @@ #!/bin/bash set -ex -/usr/share/elasticsearch/bin/elasticsearch -Ediscovery.type=single-node +/usr/share/elasticsearch/bin/elasticsearch -Ediscovery.type=single-node -Eaction.destructive_requires_name=false diff --git a/CHANGELOG.md b/CHANGELOG.md index 77a2ea592..8f484d725 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +## 10.8.6 + - Fixed an issue where a single over-size event being rejected by Elasticsearch would cause the entire entire batch to be retried indefinitely. The oversize event will still be retried on its own and logging has been improved to include payload sizes in this situation [#972](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/972) + - Fixed an issue with `http_compression => true` where a well-compressed payload could fit under our outbound 20MB limit but expand beyond Elasticsearch's 100MB limit, causing bulk failures. Bulk grouping is now determined entirely by the decompressed payload size [#823](https://github.com/logstash-plugins/logstash-output-elasticsearch/issues/823) + - Improved debug-level logging about bulk requests. + ## 10.8.5 - Feat: assert returned item count from _bulk [#997](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/997) diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb index 16b09f0cf..fe8192a2e 100644 --- a/lib/logstash/outputs/elasticsearch/http_client.rb +++ b/lib/logstash/outputs/elasticsearch/http_client.rb @@ -109,27 +109,50 @@ def bulk(actions) body_stream = StringIO.new if http_compression body_stream.set_encoding "BINARY" - stream_writer = Zlib::GzipWriter.new(body_stream, Zlib::DEFAULT_COMPRESSION, Zlib::DEFAULT_STRATEGY) - else + stream_writer = gzip_writer(body_stream) + else stream_writer = body_stream end bulk_responses = [] - bulk_actions.each do |action| + batch_actions = [] + bulk_actions.each_with_index do |action, index| as_json = action.is_a?(Array) ? action.map {|line| LogStash::Json.dump(line)}.join("\n") : LogStash::Json.dump(action) as_json << "\n" - if (body_stream.size + as_json.bytesize) > TARGET_BULK_BYTES - bulk_responses << bulk_send(body_stream) unless body_stream.size == 0 + if (stream_writer.pos + as_json.bytesize) > TARGET_BULK_BYTES && stream_writer.pos > 0 + stream_writer.flush # ensure writer has sync'd buffers before reporting sizes + logger.debug("Sending partial bulk request for batch with one or more actions remaining.", + :action_count => batch_actions.size, + :payload_size => stream_writer.pos, + :content_length => body_stream.size, + :batch_offset => (index + 1 - batch_actions.size)) + bulk_responses << bulk_send(body_stream, batch_actions) + body_stream.truncate(0) && body_stream.seek(0) + stream_writer = gzip_writer(body_stream) if http_compression + batch_actions.clear end stream_writer.write(as_json) + batch_actions << action end stream_writer.close if http_compression - bulk_responses << bulk_send(body_stream) if body_stream.size > 0 + logger.debug("Sending final bulk request for batch.", + :action_count => batch_actions.size, + :payload_size => stream_writer.pos, + :content_length => body_stream.size, + :batch_offset => (actions.size - batch_actions.size)) + bulk_responses << bulk_send(body_stream, batch_actions) if body_stream.size > 0 body_stream.close if !http_compression join_bulk_responses(bulk_responses) end + def gzip_writer(io) + fail(ArgumentError, "Cannot create gzip writer on IO with unread bytes") unless io.eof? + fail(ArgumentError, "Cannot create gzip writer on non-empty IO") unless io.pos == 0 + + Zlib::GzipWriter.new(io, Zlib::DEFAULT_COMPRESSION, Zlib::DEFAULT_STRATEGY) + end + def join_bulk_responses(bulk_responses) { "errors" => bulk_responses.any? {|r| r["errors"] == true}, @@ -137,25 +160,37 @@ def join_bulk_responses(bulk_responses) } end - def bulk_send(body_stream) + def bulk_send(body_stream, batch_actions) params = http_compression ? {:headers => {"Content-Encoding" => "gzip"}} : {} - # Discard the URL response = @pool.post(@bulk_path, params, body_stream.string) - if !body_stream.closed? - body_stream.truncate(0) - body_stream.seek(0) - end @bulk_response_metrics.increment(response.code.to_s) - if response.code != 200 + case response.code + when 200 # OK + LogStash::Json.load(response.body) + when 413 # Payload Too Large + logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size) + emulate_batch_error_response(batch_actions, response.code, 'payload_too_large') + else url = ::LogStash::Util::SafeURI.new(response.final_url) raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new( response.code, url, body_stream.to_s, response.body ) end + end - LogStash::Json.load(response.body) + def emulate_batch_error_response(actions, http_code, reason) + { + "errors" => true, + "items" => actions.map do |action| + action = action.first if action.is_a?(Array) + request_action, request_parameters = action.first + { + request_action => {"status" => http_code, "error" => { "type" => reason }} + } + end + } end def get(path) diff --git a/lib/logstash/plugin_mixins/elasticsearch/common.rb b/lib/logstash/plugin_mixins/elasticsearch/common.rb index 7ba2d8e22..403ce0150 100644 --- a/lib/logstash/plugin_mixins/elasticsearch/common.rb +++ b/lib/logstash/plugin_mixins/elasticsearch/common.rb @@ -299,7 +299,7 @@ def safe_bulk(actions) retry unless @stopping.true? rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e @bulk_request_metrics.increment(:failures) - log_hash = {:code => e.response_code, :url => e.url.sanitized.to_s} + log_hash = {:code => e.response_code, :url => e.url.sanitized.to_s, :content_length => e.request_body.bytesize} log_hash[:body] = e.response_body if @logger.debug? # Generally this is too verbose message = "Encountered a retryable error. Will Retry with exponential backoff " diff --git a/logstash-output-elasticsearch.gemspec b/logstash-output-elasticsearch.gemspec index 344f0c4a7..44b7d47ba 100644 --- a/logstash-output-elasticsearch.gemspec +++ b/logstash-output-elasticsearch.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-elasticsearch' - s.version = '10.8.5' + s.version = '10.8.6' s.licenses = ['apache-2.0'] s.summary = "Stores logs in Elasticsearch" diff --git a/spec/unit/outputs/elasticsearch/http_client_spec.rb b/spec/unit/outputs/elasticsearch/http_client_spec.rb index e722d6826..e79ff2265 100644 --- a/spec/unit/outputs/elasticsearch/http_client_spec.rb +++ b/spec/unit/outputs/elasticsearch/http_client_spec.rb @@ -204,7 +204,7 @@ end describe "#bulk" do - subject { described_class.new(base_options) } + subject(:http_client) { described_class.new(base_options) } require "json" let(:message) { "hey" } @@ -212,42 +212,61 @@ ["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message}], ]} - context "if a message is over TARGET_BULK_BYTES" do - let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES } - let(:message) { "a" * (target_bulk_bytes + 1) } + [true,false].each do |http_compression_enabled| + context "with `http_compression => #{http_compression_enabled}`" do - it "should be handled properly" do - allow(subject).to receive(:join_bulk_responses) - expect(subject).to receive(:bulk_send).once do |data| - expect(data.size).to be > target_bulk_bytes + let(:base_options) { super().merge(:client_settings => {:http_compression => http_compression_enabled}) } + + before(:each) do + if http_compression_enabled + expect(http_client).to receive(:gzip_writer).at_least(:once).and_call_original + else + expect(http_client).to_not receive(:gzip_writer) + end end - s = subject.send(:bulk, actions) - end - end - context "with two messages" do - let(:message1) { "hey" } - let(:message2) { "you" } - let(:actions) { [ - ["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message1}], - ["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message2}], - ]} - it "executes one bulk_send operation" do - allow(subject).to receive(:join_bulk_responses) - expect(subject).to receive(:bulk_send).once - s = subject.send(:bulk, actions) - end + context "if a message is over TARGET_BULK_BYTES" do + let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES } + let(:message) { "a" * (target_bulk_bytes + 1) } + + it "should be handled properly" do + allow(subject).to receive(:join_bulk_responses) + expect(subject).to receive(:bulk_send).once do |data| + if !http_compression_enabled + expect(data.size).to be > target_bulk_bytes + else + expect(Zlib::gunzip(data.string).size).to be > target_bulk_bytes + end + end + s = subject.send(:bulk, actions) + end + end + + context "with two messages" do + let(:message1) { "hey" } + let(:message2) { "you" } + let(:actions) { [ + ["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message1}], + ["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message2}], + ]} + it "executes one bulk_send operation" do + allow(subject).to receive(:join_bulk_responses) + expect(subject).to receive(:bulk_send).once + s = subject.send(:bulk, actions) + end - context "if one exceeds TARGET_BULK_BYTES" do - let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES } - let(:message1) { "a" * (target_bulk_bytes + 1) } - it "executes two bulk_send operations" do - allow(subject).to receive(:join_bulk_responses) - expect(subject).to receive(:bulk_send).twice - s = subject.send(:bulk, actions) + context "if one exceeds TARGET_BULK_BYTES" do + let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES } + let(:message1) { "a" * (target_bulk_bytes + 1) } + it "executes two bulk_send operations" do + allow(subject).to receive(:join_bulk_responses) + expect(subject).to receive(:bulk_send).twice + s = subject.send(:bulk, actions) + end + end end - end - end + end + end end describe "sniffing" do diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index 7b52ed71e..7aa20ce88 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -4,7 +4,7 @@ require "logstash/outputs/elasticsearch" describe LogStash::Outputs::ElasticSearch do - subject { described_class.new(options) } + subject(:elasticsearch_output_instance) { described_class.new(options) } let(:options) { {} } let(:maximum_seen_major_version) { [1,2,5,6,7,8].sample } @@ -265,12 +265,14 @@ let(:event) { ::LogStash::Event.new("foo" => "bar") } let(:error) do ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new( - 429, double("url").as_null_object, double("request body"), double("response body") + 429, double("url").as_null_object, request_body, double("response body") ) end let(:logger) { double("logger").as_null_object } let(:response) { { :errors => [], :items => [] } } + let(:request_body) { double(:request_body, :bytesize => 1023) } + before(:each) do i = 0 @@ -326,7 +328,7 @@ end before(:each) do - allow(subject.client).to receive(:bulk_send).with(instance_of(StringIO)) do |stream| + allow(subject.client).to receive(:bulk_send).with(instance_of(StringIO), instance_of(Array)) do |stream, actions| expect( stream.string ).to include '"foo":"bar1"' expect( stream.string ).to include '"foo":"bar2"' end.and_return(bulk_response, {"errors"=>false}) # let's make it go away (second call) to not retry indefinitely @@ -350,6 +352,43 @@ end end + context '413 errors' do + let(:payload_size) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES + 1024 } + let(:event) { ::LogStash::Event.new("message" => ("a" * payload_size ) ) } + + let(:logger_stub) { double("logger").as_null_object } + + before(:each) do + allow(elasticsearch_output_instance.client).to receive(:logger).and_return(logger_stub) + + allow(elasticsearch_output_instance.client).to receive(:bulk).and_call_original + + max_bytes = payload_size * 3 / 4 # ensure a failure first attempt + allow(elasticsearch_output_instance.client.pool).to receive(:post) do |path, params, body| + if body.length > max_bytes + max_bytes *= 2 # ensure a successful retry + double("Response", :code => 413, :body => "") + else + double("Response", :code => 200, :body => '{"errors":false,"items":[{"index":{"status":200,"result":"created"}}]}') + end + end + end + + it 'retries the 413 until it goes away' do + elasticsearch_output_instance.multi_receive([event]) + + expect(elasticsearch_output_instance.client).to have_received(:bulk).twice + end + + it 'logs about payload quantity and size' do + elasticsearch_output_instance.multi_receive([event]) + + expect(logger_stub).to have_received(:warn) + .with(a_string_matching(/413 Payload Too Large/), + hash_including(:action_count => 1, :content_length => a_value > 20_000_000)) + end + end + context "with timeout set" do let(:listener) { Flores::Random.tcp_listener } let(:port) { listener[2] }