Skip to content

Commit b9c66b5

Browse files
authored
fix: prevent sub-batch 413's from blocking whole batch (#972)
* fix: prevent sub-batch 413's from infinitely retrying whole batch The Http Client breaks batches of actions into sub-batches that are up to 20MB in size, sending larger actions as batches-of-one, and zips the responses together to emulate a single batch response from the Elasticsearch API. When an individual HTTP request is rejected by Elasticsearch (or by an intermediate proxy or load-balancer) with an HTTP/1.1 413, we can emulate the error response instead of blowing an exception through to the whole batch. This allows only the offending events/actions to be subject to retry logic. Along the way, we improve logging at the `debug` level for sub-batches, and emit clear `warn`-level logs with payload sizes when we hit HTTP 413 rejections. * size batch by _decompressed_ payload size * tests: config elasticsearch to allow wildcard deletes The default value of Elasticsearch's `action.destructive_requires_rename` has changed to true in elastic/elasticsearch#66908 which causes our integration tests' wildcard deletes to fail. By specifying this config explicitly, we ensure the desired behaviour is selected.
1 parent 9401759 commit b9c66b5

File tree

7 files changed

+150
-52
lines changed

7 files changed

+150
-52
lines changed

.ci/elasticsearch-run.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
#!/bin/bash
22
set -ex
33

4-
/usr/share/elasticsearch/bin/elasticsearch -Ediscovery.type=single-node
4+
/usr/share/elasticsearch/bin/elasticsearch -Ediscovery.type=single-node -Eaction.destructive_requires_name=false

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
## 10.8.6
2+
- Fixed an issue where a single over-size event being rejected by Elasticsearch would cause the entire entire batch to be retried indefinitely. The oversize event will still be retried on its own and logging has been improved to include payload sizes in this situation [#972](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/972)
3+
- Fixed an issue with `http_compression => true` where a well-compressed payload could fit under our outbound 20MB limit but expand beyond Elasticsearch's 100MB limit, causing bulk failures. Bulk grouping is now determined entirely by the decompressed payload size [#823](https://github.com/logstash-plugins/logstash-output-elasticsearch/issues/823)
4+
- Improved debug-level logging about bulk requests.
5+
16
## 10.8.5
27
- Feat: assert returned item count from _bulk [#997](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/997)
38

lib/logstash/outputs/elasticsearch/http_client.rb

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -109,53 +109,88 @@ def bulk(actions)
109109
body_stream = StringIO.new
110110
if http_compression
111111
body_stream.set_encoding "BINARY"
112-
stream_writer = Zlib::GzipWriter.new(body_stream, Zlib::DEFAULT_COMPRESSION, Zlib::DEFAULT_STRATEGY)
113-
else
112+
stream_writer = gzip_writer(body_stream)
113+
else
114114
stream_writer = body_stream
115115
end
116116
bulk_responses = []
117-
bulk_actions.each do |action|
117+
batch_actions = []
118+
bulk_actions.each_with_index do |action, index|
118119
as_json = action.is_a?(Array) ?
119120
action.map {|line| LogStash::Json.dump(line)}.join("\n") :
120121
LogStash::Json.dump(action)
121122
as_json << "\n"
122-
if (body_stream.size + as_json.bytesize) > TARGET_BULK_BYTES
123-
bulk_responses << bulk_send(body_stream) unless body_stream.size == 0
123+
if (stream_writer.pos + as_json.bytesize) > TARGET_BULK_BYTES && stream_writer.pos > 0
124+
stream_writer.flush # ensure writer has sync'd buffers before reporting sizes
125+
logger.debug("Sending partial bulk request for batch with one or more actions remaining.",
126+
:action_count => batch_actions.size,
127+
:payload_size => stream_writer.pos,
128+
:content_length => body_stream.size,
129+
:batch_offset => (index + 1 - batch_actions.size))
130+
bulk_responses << bulk_send(body_stream, batch_actions)
131+
body_stream.truncate(0) && body_stream.seek(0)
132+
stream_writer = gzip_writer(body_stream) if http_compression
133+
batch_actions.clear
124134
end
125135
stream_writer.write(as_json)
136+
batch_actions << action
126137
end
127138
stream_writer.close if http_compression
128-
bulk_responses << bulk_send(body_stream) if body_stream.size > 0
139+
logger.debug("Sending final bulk request for batch.",
140+
:action_count => batch_actions.size,
141+
:payload_size => stream_writer.pos,
142+
:content_length => body_stream.size,
143+
:batch_offset => (actions.size - batch_actions.size))
144+
bulk_responses << bulk_send(body_stream, batch_actions) if body_stream.size > 0
129145
body_stream.close if !http_compression
130146
join_bulk_responses(bulk_responses)
131147
end
132148

149+
def gzip_writer(io)
150+
fail(ArgumentError, "Cannot create gzip writer on IO with unread bytes") unless io.eof?
151+
fail(ArgumentError, "Cannot create gzip writer on non-empty IO") unless io.pos == 0
152+
153+
Zlib::GzipWriter.new(io, Zlib::DEFAULT_COMPRESSION, Zlib::DEFAULT_STRATEGY)
154+
end
155+
133156
def join_bulk_responses(bulk_responses)
134157
{
135158
"errors" => bulk_responses.any? {|r| r["errors"] == true},
136159
"items" => bulk_responses.reduce([]) {|m,r| m.concat(r.fetch("items", []))}
137160
}
138161
end
139162

140-
def bulk_send(body_stream)
163+
def bulk_send(body_stream, batch_actions)
141164
params = http_compression ? {:headers => {"Content-Encoding" => "gzip"}} : {}
142-
# Discard the URL
143165
response = @pool.post(@bulk_path, params, body_stream.string)
144-
if !body_stream.closed?
145-
body_stream.truncate(0)
146-
body_stream.seek(0)
147-
end
148166

149167
@bulk_response_metrics.increment(response.code.to_s)
150168

151-
if response.code != 200
169+
case response.code
170+
when 200 # OK
171+
LogStash::Json.load(response.body)
172+
when 413 # Payload Too Large
173+
logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size)
174+
emulate_batch_error_response(batch_actions, response.code, 'payload_too_large')
175+
else
152176
url = ::LogStash::Util::SafeURI.new(response.final_url)
153177
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
154178
response.code, url, body_stream.to_s, response.body
155179
)
156180
end
181+
end
157182

158-
LogStash::Json.load(response.body)
183+
def emulate_batch_error_response(actions, http_code, reason)
184+
{
185+
"errors" => true,
186+
"items" => actions.map do |action|
187+
action = action.first if action.is_a?(Array)
188+
request_action, request_parameters = action.first
189+
{
190+
request_action => {"status" => http_code, "error" => { "type" => reason }}
191+
}
192+
end
193+
}
159194
end
160195

161196
def get(path)

lib/logstash/plugin_mixins/elasticsearch/common.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ def safe_bulk(actions)
299299
retry unless @stopping.true?
300300
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
301301
@bulk_request_metrics.increment(:failures)
302-
log_hash = {:code => e.response_code, :url => e.url.sanitized.to_s}
302+
log_hash = {:code => e.response_code, :url => e.url.sanitized.to_s, :content_length => e.request_body.bytesize}
303303
log_hash[:body] = e.response_body if @logger.debug? # Generally this is too verbose
304304
message = "Encountered a retryable error. Will Retry with exponential backoff "
305305

logstash-output-elasticsearch.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-output-elasticsearch'
3-
s.version = '10.8.5'
3+
s.version = '10.8.6'
44

55
s.licenses = ['apache-2.0']
66
s.summary = "Stores logs in Elasticsearch"

spec/unit/outputs/elasticsearch/http_client_spec.rb

Lines changed: 51 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -204,50 +204,69 @@
204204
end
205205

206206
describe "#bulk" do
207-
subject { described_class.new(base_options) }
207+
subject(:http_client) { described_class.new(base_options) }
208208

209209
require "json"
210210
let(:message) { "hey" }
211211
let(:actions) { [
212212
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message}],
213213
]}
214214

215-
context "if a message is over TARGET_BULK_BYTES" do
216-
let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES }
217-
let(:message) { "a" * (target_bulk_bytes + 1) }
215+
[true,false].each do |http_compression_enabled|
216+
context "with `http_compression => #{http_compression_enabled}`" do
218217

219-
it "should be handled properly" do
220-
allow(subject).to receive(:join_bulk_responses)
221-
expect(subject).to receive(:bulk_send).once do |data|
222-
expect(data.size).to be > target_bulk_bytes
218+
let(:base_options) { super().merge(:client_settings => {:http_compression => http_compression_enabled}) }
219+
220+
before(:each) do
221+
if http_compression_enabled
222+
expect(http_client).to receive(:gzip_writer).at_least(:once).and_call_original
223+
else
224+
expect(http_client).to_not receive(:gzip_writer)
225+
end
223226
end
224-
s = subject.send(:bulk, actions)
225-
end
226-
end
227227

228-
context "with two messages" do
229-
let(:message1) { "hey" }
230-
let(:message2) { "you" }
231-
let(:actions) { [
232-
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message1}],
233-
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message2}],
234-
]}
235-
it "executes one bulk_send operation" do
236-
allow(subject).to receive(:join_bulk_responses)
237-
expect(subject).to receive(:bulk_send).once
238-
s = subject.send(:bulk, actions)
239-
end
228+
context "if a message is over TARGET_BULK_BYTES" do
229+
let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES }
230+
let(:message) { "a" * (target_bulk_bytes + 1) }
231+
232+
it "should be handled properly" do
233+
allow(subject).to receive(:join_bulk_responses)
234+
expect(subject).to receive(:bulk_send).once do |data|
235+
if !http_compression_enabled
236+
expect(data.size).to be > target_bulk_bytes
237+
else
238+
expect(Zlib::gunzip(data.string).size).to be > target_bulk_bytes
239+
end
240+
end
241+
s = subject.send(:bulk, actions)
242+
end
243+
end
244+
245+
context "with two messages" do
246+
let(:message1) { "hey" }
247+
let(:message2) { "you" }
248+
let(:actions) { [
249+
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message1}],
250+
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message2}],
251+
]}
252+
it "executes one bulk_send operation" do
253+
allow(subject).to receive(:join_bulk_responses)
254+
expect(subject).to receive(:bulk_send).once
255+
s = subject.send(:bulk, actions)
256+
end
240257

241-
context "if one exceeds TARGET_BULK_BYTES" do
242-
let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES }
243-
let(:message1) { "a" * (target_bulk_bytes + 1) }
244-
it "executes two bulk_send operations" do
245-
allow(subject).to receive(:join_bulk_responses)
246-
expect(subject).to receive(:bulk_send).twice
247-
s = subject.send(:bulk, actions)
258+
context "if one exceeds TARGET_BULK_BYTES" do
259+
let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES }
260+
let(:message1) { "a" * (target_bulk_bytes + 1) }
261+
it "executes two bulk_send operations" do
262+
allow(subject).to receive(:join_bulk_responses)
263+
expect(subject).to receive(:bulk_send).twice
264+
s = subject.send(:bulk, actions)
265+
end
266+
end
248267
end
249-
end
250-
end
268+
end
269+
end
251270
end
252271

253272
describe "sniffing" do

spec/unit/outputs/elasticsearch_spec.rb

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
require "logstash/outputs/elasticsearch"
55

66
describe LogStash::Outputs::ElasticSearch do
7-
subject { described_class.new(options) }
7+
subject(:elasticsearch_output_instance) { described_class.new(options) }
88
let(:options) { {} }
99
let(:maximum_seen_major_version) { [1,2,5,6,7,8].sample }
1010

@@ -265,12 +265,14 @@
265265
let(:event) { ::LogStash::Event.new("foo" => "bar") }
266266
let(:error) do
267267
::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
268-
429, double("url").as_null_object, double("request body"), double("response body")
268+
429, double("url").as_null_object, request_body, double("response body")
269269
)
270270
end
271271
let(:logger) { double("logger").as_null_object }
272272
let(:response) { { :errors => [], :items => [] } }
273273

274+
let(:request_body) { double(:request_body, :bytesize => 1023) }
275+
274276
before(:each) do
275277

276278
i = 0
@@ -326,7 +328,7 @@
326328
end
327329

328330
before(:each) do
329-
allow(subject.client).to receive(:bulk_send).with(instance_of(StringIO)) do |stream|
331+
allow(subject.client).to receive(:bulk_send).with(instance_of(StringIO), instance_of(Array)) do |stream, actions|
330332
expect( stream.string ).to include '"foo":"bar1"'
331333
expect( stream.string ).to include '"foo":"bar2"'
332334
end.and_return(bulk_response, {"errors"=>false}) # let's make it go away (second call) to not retry indefinitely
@@ -350,6 +352,43 @@
350352
end
351353
end
352354

355+
context '413 errors' do
356+
let(:payload_size) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES + 1024 }
357+
let(:event) { ::LogStash::Event.new("message" => ("a" * payload_size ) ) }
358+
359+
let(:logger_stub) { double("logger").as_null_object }
360+
361+
before(:each) do
362+
allow(elasticsearch_output_instance.client).to receive(:logger).and_return(logger_stub)
363+
364+
allow(elasticsearch_output_instance.client).to receive(:bulk).and_call_original
365+
366+
max_bytes = payload_size * 3 / 4 # ensure a failure first attempt
367+
allow(elasticsearch_output_instance.client.pool).to receive(:post) do |path, params, body|
368+
if body.length > max_bytes
369+
max_bytes *= 2 # ensure a successful retry
370+
double("Response", :code => 413, :body => "")
371+
else
372+
double("Response", :code => 200, :body => '{"errors":false,"items":[{"index":{"status":200,"result":"created"}}]}')
373+
end
374+
end
375+
end
376+
377+
it 'retries the 413 until it goes away' do
378+
elasticsearch_output_instance.multi_receive([event])
379+
380+
expect(elasticsearch_output_instance.client).to have_received(:bulk).twice
381+
end
382+
383+
it 'logs about payload quantity and size' do
384+
elasticsearch_output_instance.multi_receive([event])
385+
386+
expect(logger_stub).to have_received(:warn)
387+
.with(a_string_matching(/413 Payload Too Large/),
388+
hash_including(:action_count => 1, :content_length => a_value > 20_000_000))
389+
end
390+
end
391+
353392
context "with timeout set" do
354393
let(:listener) { Flores::Random.tcp_listener }
355394
let(:port) { listener[2] }

0 commit comments

Comments
 (0)