Skip to content

Commit 9a43f31

Browse files
committed
fix: prevent sub-batch 413's from infinitely retrying whole batch
The Http Client breaks batches of actions into sub-batches that are up to 20MB in size, sending larger actions as batches-of-one, and zips the responses together to emulate a single batch response from the Elasticsearch API. When an individual HTTP request is rejected by Elasticsearch (or by an intermediate proxy or load-balancer) with an HTTP/1.1 413, we can emulate the error response instead of blowing an exception through to the whole batch. This allows only the offending events/actions to be subject to retry logic. Along the way, we improve logging at the `debug` level for sub-batches, and emit clear `warn`-level logs with payload sizes when we hit HTTP 413 rejections.
1 parent ce8f8ce commit 9a43f31

File tree

5 files changed

+80
-16
lines changed

5 files changed

+80
-16
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 10.7.3
2+
- Fixed an issue where a single over-size event being rejected by Elasticsearch would cause the entire entire batch to be retried indefinitely. The oversize event will still be retried on its own and logging has been improved to include payload sizes in this situation [#972](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/972)
3+
14
## 10.7.2
25
- [DOC] Fixed links to restructured Logstash-to-cloud docs [#975](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/975)
36

lib/logstash/outputs/elasticsearch/common.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,7 @@ def safe_bulk(actions)
414414
retry unless @stopping.true?
415415
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
416416
@bulk_request_metrics.increment(:failures)
417-
log_hash = {:code => e.response_code, :url => e.url.sanitized.to_s}
417+
log_hash = {:code => e.response_code, :url => e.url.sanitized.to_s, :content_length => e.request_body.bytesize}
418418
log_hash[:body] = e.response_body if @logger.debug? # Generally this is too verbose
419419
message = "Encountered a retryable error. Will Retry with exponential backoff "
420420

lib/logstash/outputs/elasticsearch/http_client.rb

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -114,18 +114,23 @@ def bulk(actions)
114114
stream_writer = body_stream
115115
end
116116
bulk_responses = []
117-
bulk_actions.each do |action|
117+
batch_actions = []
118+
bulk_actions.each_with_index do |action, index|
118119
as_json = action.is_a?(Array) ?
119120
action.map {|line| LogStash::Json.dump(line)}.join("\n") :
120121
LogStash::Json.dump(action)
121122
as_json << "\n"
122-
if (body_stream.size + as_json.bytesize) > TARGET_BULK_BYTES
123-
bulk_responses << bulk_send(body_stream) unless body_stream.size == 0
123+
if (body_stream.size + as_json.bytesize) > TARGET_BULK_BYTES && body_stream.size > 0
124+
logger.debug("Sending partial bulk request for batch with one or more actions remaining.", :action_count => batch_actions.size, :content_length => body_stream.size, :batch_offset => (index + 1 - batch_actions.size))
125+
bulk_responses << bulk_send(body_stream, batch_actions)
126+
batch_actions.clear
124127
end
125128
stream_writer.write(as_json)
129+
batch_actions << action
126130
end
127131
stream_writer.close if http_compression
128-
bulk_responses << bulk_send(body_stream) if body_stream.size > 0
132+
logger.debug("Sending final bulk request for batch.", :action_count => batch_actions.size, :content_length => body_stream.size, :batch_offset => (actions.size - batch_actions.size))
133+
bulk_responses << bulk_send(body_stream, batch_actions) if body_stream.size > 0
129134
body_stream.close if !http_compression
130135
join_bulk_responses(bulk_responses)
131136
end
@@ -137,25 +142,42 @@ def join_bulk_responses(bulk_responses)
137142
}
138143
end
139144

140-
def bulk_send(body_stream)
145+
def bulk_send(body_stream, batch_actions)
141146
params = http_compression ? {:headers => {"Content-Encoding" => "gzip"}} : {}
142-
# Discard the URL
143147
response = @pool.post(@bulk_path, params, body_stream.string)
144-
if !body_stream.closed?
145-
body_stream.truncate(0)
146-
body_stream.seek(0)
147-
end
148148

149149
@bulk_response_metrics.increment(response.code.to_s)
150150

151-
if response.code != 200
151+
case response.code
152+
when 200 # OK
153+
LogStash::Json.load(response.body)
154+
when 413 # Payload Too Large
155+
logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size)
156+
emulate_batch_error_response(batch_actions, response.code, 'payload_too_large')
157+
else
152158
url = ::LogStash::Util::SafeURI.new(response.final_url)
153159
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
154160
response.code, url, body_stream.to_s, response.body
155161
)
156162
end
163+
ensure
164+
if !body_stream.closed?
165+
body_stream.truncate(0)
166+
body_stream.seek(0)
167+
end
168+
end
157169

158-
LogStash::Json.load(response.body)
170+
def emulate_batch_error_response(actions, http_code, reason)
171+
{
172+
"errors" => true,
173+
"items" => actions.map do |action|
174+
action = action.first if action.is_a?(Array)
175+
request_action, request_parameters = action.first
176+
{
177+
request_action => {"status" => http_code, "error" => { "type" => reason }}
178+
}
179+
end
180+
}
159181
end
160182

161183
def get(path)

logstash-output-elasticsearch.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-output-elasticsearch'
3-
s.version = '10.7.2'
3+
s.version = '10.7.3'
44

55
s.licenses = ['apache-2.0']
66
s.summary = "Stores logs in Elasticsearch"

spec/unit/outputs/elasticsearch_spec.rb

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
require "logstash/outputs/elasticsearch"
55

66
describe LogStash::Outputs::ElasticSearch do
7-
subject { described_class.new(options) }
7+
subject(:elasticsearch_output_instance) { described_class.new(options) }
88
let(:options) { {} }
99
let(:maximum_seen_major_version) { [1,2,5,6,7,8].sample }
1010

@@ -265,12 +265,14 @@
265265
let(:event) { ::LogStash::Event.new("foo" => "bar") }
266266
let(:error) do
267267
::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
268-
429, double("url").as_null_object, double("request body"), double("response body")
268+
429, double("url").as_null_object, request_body, double("response body")
269269
)
270270
end
271271
let(:logger) { double("logger").as_null_object }
272272
let(:response) { { :errors => [], :items => [] } }
273273

274+
let(:request_body) { double(:request_body, :bytesize => 1023) }
275+
274276
before(:each) do
275277

276278
i = 0
@@ -298,6 +300,43 @@
298300
end
299301
end
300302

303+
context '413 errors' do
304+
let(:payload_size) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES + 1024 }
305+
let(:event) { ::LogStash::Event.new("message" => ("a" * payload_size ) ) }
306+
307+
let(:logger_stub) { double("logger").as_null_object }
308+
309+
before(:each) do
310+
allow(elasticsearch_output_instance.client).to receive(:logger).and_return(logger_stub)
311+
312+
allow(elasticsearch_output_instance.client).to receive(:bulk).and_call_original
313+
314+
max_bytes = payload_size * 3 / 4 # ensure a failure first attempt
315+
allow(elasticsearch_output_instance.client.pool).to receive(:post) do |path, params, body|
316+
if body.length > max_bytes
317+
max_bytes *= 2 # ensure a successful retry
318+
double("Response", :code => 413, :body => "")
319+
else
320+
double("Response", :code => 200, :body => '{"errors":false,"items":[{"index":{"status":200,"result":"created"}}]}')
321+
end
322+
end
323+
end
324+
325+
it 'retries the 413 until it goes away' do
326+
elasticsearch_output_instance.multi_receive([event])
327+
328+
expect(elasticsearch_output_instance.client).to have_received(:bulk).twice
329+
end
330+
331+
it 'logs about payload quantity and size' do
332+
elasticsearch_output_instance.multi_receive([event])
333+
334+
expect(logger_stub).to have_received(:warn)
335+
.with(a_string_matching(/413 Payload Too Large/),
336+
hash_including(:action_count => 1, :content_length => a_value > 20_000_000))
337+
end
338+
end
339+
301340
context "with timeout set" do
302341
let(:listener) { Flores::Random.tcp_listener }
303342
let(:port) { listener[2] }

0 commit comments

Comments
 (0)