From 19f3528bd75154b287cf920a1831878132304ab4 Mon Sep 17 00:00:00 2001 From: msvticket Date: Thu, 14 Jun 2018 11:56:34 +0200 Subject: [PATCH 1/3] If we get a http status 413 on a bulk request we try halfing target_bulk_bytes, which is also changed from constant to attribute. solving #785 --- lib/logstash/outputs/elasticsearch/common.rb | 5 +++ .../outputs/elasticsearch/http_client.rb | 42 ++++++++++--------- .../outputs/elasticsearch/http_client/pool.rb | 5 ++- .../outputs/elasticsearch/http_client_spec.rb | 12 +++--- 4 files changed, 35 insertions(+), 29 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/common.rb b/lib/logstash/outputs/elasticsearch/common.rb index f02800952..08d2b0be6 100644 --- a/lib/logstash/outputs/elasticsearch/common.rb +++ b/lib/logstash/outputs/elasticsearch/common.rb @@ -313,6 +313,11 @@ def safe_bulk(actions) # Even though we retry the user should be made aware of these if e.response_code == 429 logger.debug(message, log_hash) + # 413 means that the request is to large. So trying to decrease the request size if there + # actually where multiple actions in a bulk request. + elsif e.response_code == 413 && e.num_actions_in_request > 1 + @client.target_bulk_bytes /= 2; + logger.info(message + "and decreased request size " , log_hash) else logger.error(message, log_hash) end diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb index b46f4be0a..ad624bdfe 100644 --- a/lib/logstash/outputs/elasticsearch/http_client.rb +++ b/lib/logstash/outputs/elasticsearch/http_client.rb @@ -8,23 +8,11 @@ require 'stringio' module LogStash; module Outputs; class ElasticSearch; - # This is a constant instead of a config option because - # there really isn't a good reason to configure it. - # - # The criteria used are: - # 1. We need a number that's less than 100MiB because ES - # won't accept bulks larger than that. - # 2. It must be large enough to amortize the connection constant - # across multiple requests. - # 3. It must be small enough that even if multiple threads hit this size - # we won't use a lot of heap. - # - # We wound up agreeing that a number greater than 10 MiB and less than 100MiB - # made sense. We picked one on the lowish side to not use too much heap. - TARGET_BULK_BYTES = 20 * 1024 * 1024 # 20MiB class HttpClient attr_reader :client, :options, :logger, :pool, :action_count, :recv_count + attr_accessor :target_bulk_bytes + # This is here in case we use DEFAULT_OPTIONS in the future # DEFAULT_OPTIONS = { # :setting => value @@ -65,6 +53,18 @@ def initialize(options={}) # mutex to prevent requests and sniffing to access the # connection pool at the same time @bulk_path = @options[:bulk_path] + + # The criteria for deciding the initial value of target_bulk_bytes is: + # 1. We need a number that's less than 100MiB because ES + # won't accept bulks larger than that. + # 2. It must be large enough to amortize the connection constant + # across multiple requests. + # 3. It must be small enough that even if multiple threads hit this size + # we won't use a lot of heap. + # + # We wound up agreeing that a number greater than 10 MiB and less than 100MiB + # made sense. We picked one on the lowish side to not use too much heap. + @target_bulk_bytes ||= 20 * 1024 * 1024 # 20MiB end def build_url_template @@ -93,7 +93,6 @@ def maximum_seen_major_version def bulk(actions) @action_count ||= 0 @action_count += actions.size - return if actions.empty? bulk_actions = actions.collect do |action, args, source| @@ -114,18 +113,21 @@ def bulk(actions) stream_writer = body_stream end bulk_responses = [] + actions_in_bulk = 0 bulk_actions.each do |action| as_json = action.is_a?(Array) ? action.map {|line| LogStash::Json.dump(line)}.join("\n") : LogStash::Json.dump(action) as_json << "\n" - if (body_stream.size + as_json.bytesize) > TARGET_BULK_BYTES - bulk_responses << bulk_send(body_stream) unless body_stream.size == 0 + actions_in_bulk += 1 + if (body_stream.size + as_json.bytesize) > @target_bulk_bytes + bulk_responses << bulk_send(body_stream, actions_in_bulk) unless body_stream.size == 0 + actions_in_bulk = 0 end stream_writer.write(as_json) end stream_writer.close if http_compression - bulk_responses << bulk_send(body_stream) if body_stream.size > 0 + bulk_responses << bulk_send(body_stream, actions_in_bulk) if body_stream.size > 0 body_stream.close if !http_compression join_bulk_responses(bulk_responses) end @@ -137,7 +139,7 @@ def join_bulk_responses(bulk_responses) } end - def bulk_send(body_stream) + def bulk_send(body_stream, num_actions_in_request) params = http_compression ? {:headers => {"Content-Encoding" => "gzip"}} : {} # Discard the URL response = @pool.post(@bulk_path, params, body_stream.string) @@ -151,7 +153,7 @@ def bulk_send(body_stream) if response.code != 200 url = ::LogStash::Util::SafeURI.new(response.final_url) raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new( - response.code, url, body_stream.to_s, response.body + response.code, url, body_stream.to_s, response.body, num_actions_in_request ) end diff --git a/lib/logstash/outputs/elasticsearch/http_client/pool.rb b/lib/logstash/outputs/elasticsearch/http_client/pool.rb index 42f8236c4..b19bba1ff 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/pool.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/pool.rb @@ -2,13 +2,14 @@ module LogStash; module Outputs; class ElasticSearch; class HttpClient; class Pool class NoConnectionAvailableError < Error; end class BadResponseCodeError < Error - attr_reader :url, :response_code, :request_body, :response_body + attr_reader :url, :response_code, :request_body, :response_body, :num_actions_in_request - def initialize(response_code, url, request_body, response_body) + def initialize(response_code, url, request_body, response_body, num_actions_in_request = 1) @response_code = response_code @url = url @request_body = request_body @response_body = response_body + @num_actions_in_request = num_actions_in_request end def message diff --git a/spec/unit/outputs/elasticsearch/http_client_spec.rb b/spec/unit/outputs/elasticsearch/http_client_spec.rb index efb7ca7f7..3abd96d4a 100644 --- a/spec/unit/outputs/elasticsearch/http_client_spec.rb +++ b/spec/unit/outputs/elasticsearch/http_client_spec.rb @@ -190,14 +190,13 @@ ["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message}], ]} - context "if a message is over TARGET_BULK_BYTES" do - let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES } - let(:message) { "a" * (target_bulk_bytes + 1) } + context "if a message is over target_bulk_bytes" do + let(:message) { "a" * (subject.target_bulk_bytes + 1) } it "should be handled properly" do allow(subject).to receive(:join_bulk_responses) expect(subject).to receive(:bulk_send).once do |data| - expect(data.size).to be > target_bulk_bytes + expect(data.size).to be > subject.target_bulk_bytes end s = subject.send(:bulk, actions) end @@ -216,9 +215,8 @@ s = subject.send(:bulk, actions) end - context "if one exceeds TARGET_BULK_BYTES" do - let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES } - let(:message1) { "a" * (target_bulk_bytes + 1) } + context "if one exceeds target_bulk_bytes" do + let(:message1) { "a" * (subject.target_bulk_bytes + 1) } it "executes two bulk_send operations" do allow(subject).to receive(:join_bulk_responses) expect(subject).to receive(:bulk_send).twice From 27e648efa3074321009ce6070f8ae0bc32f21657 Mon Sep 17 00:00:00 2001 From: msvticket Date: Thu, 14 Jun 2018 13:47:45 +0200 Subject: [PATCH 2/3] Updating integration test due to change of TARGET_BULK_BYTES to attribute --- spec/integration/outputs/index_spec.rb | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/spec/integration/outputs/index_spec.rb b/spec/integration/outputs/index_spec.rb index 30e36e02b..96de532d8 100644 --- a/spec/integration/outputs/index_spec.rb +++ b/spec/integration/outputs/index_spec.rb @@ -1,8 +1,7 @@ require_relative "../../../spec/es_spec_helper" require "logstash/outputs/elasticsearch" -describe "TARGET_BULK_BYTES", :integration => true do - let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES } +describe "target_bulk_bytes", :integration => true do let(:event_count) { 1000 } let(:events) { event_count.times.map { event }.to_a } let(:config) { @@ -22,11 +21,11 @@ end describe "batches that are too large for one" do - let(:event) { LogStash::Event.new("message" => "a " * (((target_bulk_bytes/2) / event_count)+1)) } + let(:event) { LogStash::Event.new("message" => "a " * (((subject.target_bulk_bytes/2) / event_count)+1)) } it "should send in two batches" do expect(subject.client).to have_received(:bulk_send).twice do |payload| - expect(payload.size).to be <= target_bulk_bytes + expect(payload.size).to be <= subject.target_bulk_bytes end end @@ -37,7 +36,7 @@ it "should send in one batch" do expect(subject.client).to have_received(:bulk_send).once do |payload| - expect(payload.size).to be <= target_bulk_bytes + expect(payload.size).to be <= subject.target_bulk_bytes end end end From f2c43510bb1b62f8127345025729ddbf70dea8ff Mon Sep 17 00:00:00 2001 From: msvticket Date: Thu, 14 Jun 2018 14:25:48 +0200 Subject: [PATCH 3/3] Updating integration test due to change of TARGET_BULK_BYTES to attribute --- spec/integration/outputs/index_spec.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/spec/integration/outputs/index_spec.rb b/spec/integration/outputs/index_spec.rb index 96de532d8..9389a6362 100644 --- a/spec/integration/outputs/index_spec.rb +++ b/spec/integration/outputs/index_spec.rb @@ -21,11 +21,11 @@ end describe "batches that are too large for one" do - let(:event) { LogStash::Event.new("message" => "a " * (((subject.target_bulk_bytes/2) / event_count)+1)) } + let(:event) { LogStash::Event.new("message" => "a " * (((subject.client.target_bulk_bytes/2) / event_count)+1)) } it "should send in two batches" do expect(subject.client).to have_received(:bulk_send).twice do |payload| - expect(payload.size).to be <= subject.target_bulk_bytes + expect(payload.size).to be <= subject.client.target_bulk_bytes end end @@ -36,7 +36,7 @@ it "should send in one batch" do expect(subject.client).to have_received(:bulk_send).once do |payload| - expect(payload.size).to be <= subject.target_bulk_bytes + expect(payload.size).to be <= subject.client.target_bulk_bytes end end end