Skip to content

Commit 2cd4843

Browse files
committed
fix: prevent sub-batch 413's from infinitely retrying whole batch
The Http Client breaks batches of actions into sub-batches that are up to 20MB in size, sending larger actions as batches-of-one, and zips the responses together to emulate a single batch response from the Elasticsearch API. When an individual HTTP request is rejected by Elasticsearch (or by an intermediate proxy or load-balancer) with an HTTP/1.1 413, we can emulate the error response instead of blowing an exception through to the whole batch. This allows only the offending events/actions to be subject to retry logic. Along the way, we improve logging at the `debug` level for sub-batches, and emit clear `warn`-level logs with payload sizes when we hit HTTP 413 rejections.
1 parent 1e6148c commit 2cd4843

File tree

4 files changed

+79
-15
lines changed

4 files changed

+79
-15
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 10.7.2
2+
- bugfix: prevent a single over-size event from causing entire batch to be retried indefinitely, and improved logging to include payload sizes in this situation [#972](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/972)
3+
14
## 10.7.1
25
- [DOC] Document the permissions required in secured clusters [#969](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/969)
36

lib/logstash/outputs/elasticsearch/common.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,7 @@ def safe_bulk(actions)
414414
retry unless @stopping.true?
415415
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
416416
@bulk_request_metrics.increment(:failures)
417-
log_hash = {:code => e.response_code, :url => e.url.sanitized.to_s}
417+
log_hash = {:code => e.response_code, :url => e.url.sanitized.to_s, :content_length => e.request_body.bytesize}
418418
log_hash[:body] = e.response_body if @logger.debug? # Generally this is too verbose
419419
message = "Encountered a retryable error. Will Retry with exponential backoff "
420420

lib/logstash/outputs/elasticsearch/http_client.rb

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -114,18 +114,23 @@ def bulk(actions)
114114
stream_writer = body_stream
115115
end
116116
bulk_responses = []
117-
bulk_actions.each do |action|
117+
batch_actions = []
118+
bulk_actions.each_with_index do |action, index|
118119
as_json = action.is_a?(Array) ?
119120
action.map {|line| LogStash::Json.dump(line)}.join("\n") :
120121
LogStash::Json.dump(action)
121122
as_json << "\n"
122-
if (body_stream.size + as_json.bytesize) > TARGET_BULK_BYTES
123-
bulk_responses << bulk_send(body_stream) unless body_stream.size == 0
123+
if (body_stream.size + as_json.bytesize) > TARGET_BULK_BYTES && body_stream.size > 0
124+
logger.debug("Sending partial bulk request for batch with one or more actions remaining.", :action_count => batch_actions.size, :content_length => body_stream.size, :batch_offset => (index + 1 - batch_actions.size))
125+
bulk_responses << bulk_send(body_stream, batch_actions)
126+
batch_actions.clear
124127
end
125128
stream_writer.write(as_json)
129+
batch_actions << action
126130
end
127131
stream_writer.close if http_compression
128-
bulk_responses << bulk_send(body_stream) if body_stream.size > 0
132+
logger.debug("Sending final bulk request for batch.", :action_count => batch_actions.size, :content_length => body_stream.size, :batch_offset => (actions.size - batch_actions.size))
133+
bulk_responses << bulk_send(body_stream, batch_actions) if body_stream.size > 0
129134
body_stream.close if !http_compression
130135
join_bulk_responses(bulk_responses)
131136
end
@@ -137,25 +142,42 @@ def join_bulk_responses(bulk_responses)
137142
}
138143
end
139144

140-
def bulk_send(body_stream)
145+
def bulk_send(body_stream, batch_actions)
141146
params = http_compression ? {:headers => {"Content-Encoding" => "gzip"}} : {}
142-
# Discard the URL
143147
response = @pool.post(@bulk_path, params, body_stream.string)
144-
if !body_stream.closed?
145-
body_stream.truncate(0)
146-
body_stream.seek(0)
147-
end
148148

149149
@bulk_response_metrics.increment(response.code.to_s)
150150

151-
if response.code != 200
151+
case response.code
152+
when 200 # OK
153+
LogStash::Json.load(response.body)
154+
when 413 # Payload Too Large
155+
logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size)
156+
emulate_batch_error_response(batch_actions, response.code, 'payload_too_large')
157+
else
152158
url = ::LogStash::Util::SafeURI.new(response.final_url)
153159
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
154160
response.code, url, body_stream.to_s, response.body
155161
)
156162
end
163+
ensure
164+
if !body_stream.closed?
165+
body_stream.truncate(0)
166+
body_stream.seek(0)
167+
end
168+
end
157169

158-
LogStash::Json.load(response.body)
170+
def emulate_batch_error_response(actions, http_code, reason)
171+
{
172+
"errors" => true,
173+
"items" => actions.map do |action|
174+
action = action.first if action.is_a?(Array)
175+
request_action, request_parameters = action.first
176+
{
177+
request_action => {"status" => http_code, "error" => { "type" => reason }}
178+
}
179+
end
180+
}
159181
end
160182

161183
def get(path)

spec/unit/outputs/elasticsearch_spec.rb

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
require "logstash/outputs/elasticsearch"
55

66
describe LogStash::Outputs::ElasticSearch do
7-
subject { described_class.new(options) }
7+
subject(:elasticsearch_output_instance) { described_class.new(options) }
88
let(:options) { {} }
99
let(:maximum_seen_major_version) { [1,2,5,6,7,8].sample }
1010

@@ -265,12 +265,14 @@
265265
let(:event) { ::LogStash::Event.new("foo" => "bar") }
266266
let(:error) do
267267
::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
268-
429, double("url").as_null_object, double("request body"), double("response body")
268+
429, double("url").as_null_object, request_body, double("response body")
269269
)
270270
end
271271
let(:logger) { double("logger").as_null_object }
272272
let(:response) { { :errors => [], :items => [] } }
273273

274+
let(:request_body) { double(:request_body, :bytesize => 1023) }
275+
274276
before(:each) do
275277

276278
i = 0
@@ -298,6 +300,43 @@
298300
end
299301
end
300302

303+
context '413 errors' do
304+
let(:payload_size) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES + 1024 }
305+
let(:event) { ::LogStash::Event.new("message" => ("a" * payload_size ) ) }
306+
307+
let(:logger_stub) { double("logger").as_null_object }
308+
309+
before(:each) do
310+
allow(elasticsearch_output_instance.client).to receive(:logger).and_return(logger_stub)
311+
312+
allow(elasticsearch_output_instance.client).to receive(:bulk).and_call_original
313+
314+
max_bytes = payload_size * 3 / 4 # ensure a failure first attempt
315+
allow(elasticsearch_output_instance.client.pool).to receive(:post) do |path, params, body|
316+
if body.length > max_bytes
317+
max_bytes *= 2 # ensure a successful retry
318+
double("Response", :code => 413, :body => "")
319+
else
320+
double("Response", :code => 200, :body => '{"errors":false,"items":[{"index":{"status":200,"result":"created"}}]}')
321+
end
322+
end
323+
end
324+
325+
it 'retries the 413 until it goes away' do
326+
elasticsearch_output_instance.multi_receive([event])
327+
328+
expect(elasticsearch_output_instance.client).to have_received(:bulk).twice
329+
end
330+
331+
it 'logs about payload quantity and size' do
332+
elasticsearch_output_instance.multi_receive([event])
333+
334+
expect(logger_stub).to have_received(:warn)
335+
.with(a_string_matching(/413 Payload Too Large/),
336+
hash_including(:action_count => 1, :content_length => a_value > 20_000_000))
337+
end
338+
end
339+
301340
context "with timeout set" do
302341
let(:listener) { Flores::Random.tcp_listener }
303342
let(:port) { listener[2] }

0 commit comments

Comments
 (0)