Skip to content

fix: prevent sub-batch 413's from blocking whole batch #972

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .ci/elasticsearch-run.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
set -ex

/usr/share/elasticsearch/bin/elasticsearch -Ediscovery.type=single-node
/usr/share/elasticsearch/bin/elasticsearch -Ediscovery.type=single-node -Eaction.destructive_requires_name=false
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 10.8.6
- Fixed an issue where a single over-size event being rejected by Elasticsearch would cause the entire entire batch to be retried indefinitely. The oversize event will still be retried on its own and logging has been improved to include payload sizes in this situation [#972](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/972)
- Fixed an issue with `http_compression => true` where a well-compressed payload could fit under our outbound 20MB limit but expand beyond Elasticsearch's 100MB limit, causing bulk failures. Bulk grouping is now determined entirely by the decompressed payload size [#823](https://github.com/logstash-plugins/logstash-output-elasticsearch/issues/823)
- Improved debug-level logging about bulk requests.

## 10.8.5
- Feat: assert returned item count from _bulk [#997](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/997)

Expand Down
63 changes: 49 additions & 14 deletions lib/logstash/outputs/elasticsearch/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,53 +109,88 @@ def bulk(actions)
body_stream = StringIO.new
if http_compression
body_stream.set_encoding "BINARY"
stream_writer = Zlib::GzipWriter.new(body_stream, Zlib::DEFAULT_COMPRESSION, Zlib::DEFAULT_STRATEGY)
else
stream_writer = gzip_writer(body_stream)
else
stream_writer = body_stream
end
bulk_responses = []
bulk_actions.each do |action|
batch_actions = []
bulk_actions.each_with_index do |action, index|
as_json = action.is_a?(Array) ?
action.map {|line| LogStash::Json.dump(line)}.join("\n") :
LogStash::Json.dump(action)
as_json << "\n"
if (body_stream.size + as_json.bytesize) > TARGET_BULK_BYTES
bulk_responses << bulk_send(body_stream) unless body_stream.size == 0
if (stream_writer.pos + as_json.bytesize) > TARGET_BULK_BYTES && stream_writer.pos > 0
stream_writer.flush # ensure writer has sync'd buffers before reporting sizes
logger.debug("Sending partial bulk request for batch with one or more actions remaining.",
:action_count => batch_actions.size,
:payload_size => stream_writer.pos,
:content_length => body_stream.size,
:batch_offset => (index + 1 - batch_actions.size))
bulk_responses << bulk_send(body_stream, batch_actions)
body_stream.truncate(0) && body_stream.seek(0)
stream_writer = gzip_writer(body_stream) if http_compression
batch_actions.clear
end
stream_writer.write(as_json)
batch_actions << action
end
stream_writer.close if http_compression
bulk_responses << bulk_send(body_stream) if body_stream.size > 0
logger.debug("Sending final bulk request for batch.",
:action_count => batch_actions.size,
:payload_size => stream_writer.pos,
:content_length => body_stream.size,
:batch_offset => (actions.size - batch_actions.size))
bulk_responses << bulk_send(body_stream, batch_actions) if body_stream.size > 0
body_stream.close if !http_compression
join_bulk_responses(bulk_responses)
end

def gzip_writer(io)
fail(ArgumentError, "Cannot create gzip writer on IO with unread bytes") unless io.eof?
fail(ArgumentError, "Cannot create gzip writer on non-empty IO") unless io.pos == 0

Zlib::GzipWriter.new(io, Zlib::DEFAULT_COMPRESSION, Zlib::DEFAULT_STRATEGY)
end

def join_bulk_responses(bulk_responses)
{
"errors" => bulk_responses.any? {|r| r["errors"] == true},
"items" => bulk_responses.reduce([]) {|m,r| m.concat(r.fetch("items", []))}
}
end

def bulk_send(body_stream)
def bulk_send(body_stream, batch_actions)
params = http_compression ? {:headers => {"Content-Encoding" => "gzip"}} : {}
# Discard the URL
response = @pool.post(@bulk_path, params, body_stream.string)
if !body_stream.closed?
body_stream.truncate(0)
body_stream.seek(0)
end

@bulk_response_metrics.increment(response.code.to_s)

if response.code != 200
case response.code
when 200 # OK
LogStash::Json.load(response.body)
when 413 # Payload Too Large
logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size)
emulate_batch_error_response(batch_actions, response.code, 'payload_too_large')
else
url = ::LogStash::Util::SafeURI.new(response.final_url)
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
response.code, url, body_stream.to_s, response.body
)
end
end

LogStash::Json.load(response.body)
def emulate_batch_error_response(actions, http_code, reason)
{
"errors" => true,
"items" => actions.map do |action|
action = action.first if action.is_a?(Array)
request_action, request_parameters = action.first
{
request_action => {"status" => http_code, "error" => { "type" => reason }}
}
end
}
end

def get(path)
Expand Down
2 changes: 1 addition & 1 deletion lib/logstash/plugin_mixins/elasticsearch/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def safe_bulk(actions)
retry unless @stopping.true?
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
@bulk_request_metrics.increment(:failures)
log_hash = {:code => e.response_code, :url => e.url.sanitized.to_s}
log_hash = {:code => e.response_code, :url => e.url.sanitized.to_s, :content_length => e.request_body.bytesize}
log_hash[:body] = e.response_body if @logger.debug? # Generally this is too verbose
message = "Encountered a retryable error. Will Retry with exponential backoff "

Expand Down
2 changes: 1 addition & 1 deletion logstash-output-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-elasticsearch'
s.version = '10.8.5'
s.version = '10.8.6'

s.licenses = ['apache-2.0']
s.summary = "Stores logs in Elasticsearch"
Expand Down
83 changes: 51 additions & 32 deletions spec/unit/outputs/elasticsearch/http_client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -204,50 +204,69 @@
end

describe "#bulk" do
subject { described_class.new(base_options) }
subject(:http_client) { described_class.new(base_options) }

require "json"
let(:message) { "hey" }
let(:actions) { [
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message}],
]}

context "if a message is over TARGET_BULK_BYTES" do
let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES }
let(:message) { "a" * (target_bulk_bytes + 1) }
[true,false].each do |http_compression_enabled|
context "with `http_compression => #{http_compression_enabled}`" do

it "should be handled properly" do
allow(subject).to receive(:join_bulk_responses)
expect(subject).to receive(:bulk_send).once do |data|
expect(data.size).to be > target_bulk_bytes
let(:base_options) { super().merge(:client_settings => {:http_compression => http_compression_enabled}) }

before(:each) do
if http_compression_enabled
expect(http_client).to receive(:gzip_writer).at_least(:once).and_call_original
else
expect(http_client).to_not receive(:gzip_writer)
end
end
s = subject.send(:bulk, actions)
end
end

context "with two messages" do
let(:message1) { "hey" }
let(:message2) { "you" }
let(:actions) { [
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message1}],
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message2}],
]}
it "executes one bulk_send operation" do
allow(subject).to receive(:join_bulk_responses)
expect(subject).to receive(:bulk_send).once
s = subject.send(:bulk, actions)
end
context "if a message is over TARGET_BULK_BYTES" do
let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES }
let(:message) { "a" * (target_bulk_bytes + 1) }

it "should be handled properly" do
allow(subject).to receive(:join_bulk_responses)
expect(subject).to receive(:bulk_send).once do |data|
if !http_compression_enabled
expect(data.size).to be > target_bulk_bytes
else
expect(Zlib::gunzip(data.string).size).to be > target_bulk_bytes
end
end
s = subject.send(:bulk, actions)
end
end

context "with two messages" do
let(:message1) { "hey" }
let(:message2) { "you" }
let(:actions) { [
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message1}],
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message2}],
]}
it "executes one bulk_send operation" do
allow(subject).to receive(:join_bulk_responses)
expect(subject).to receive(:bulk_send).once
s = subject.send(:bulk, actions)
end

context "if one exceeds TARGET_BULK_BYTES" do
let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES }
let(:message1) { "a" * (target_bulk_bytes + 1) }
it "executes two bulk_send operations" do
allow(subject).to receive(:join_bulk_responses)
expect(subject).to receive(:bulk_send).twice
s = subject.send(:bulk, actions)
context "if one exceeds TARGET_BULK_BYTES" do
let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES }
let(:message1) { "a" * (target_bulk_bytes + 1) }
it "executes two bulk_send operations" do
allow(subject).to receive(:join_bulk_responses)
expect(subject).to receive(:bulk_send).twice
s = subject.send(:bulk, actions)
end
end
end
end
end
end
end
end

describe "sniffing" do
Expand Down
45 changes: 42 additions & 3 deletions spec/unit/outputs/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
require "logstash/outputs/elasticsearch"

describe LogStash::Outputs::ElasticSearch do
subject { described_class.new(options) }
subject(:elasticsearch_output_instance) { described_class.new(options) }
let(:options) { {} }
let(:maximum_seen_major_version) { [1,2,5,6,7,8].sample }

Expand Down Expand Up @@ -265,12 +265,14 @@
let(:event) { ::LogStash::Event.new("foo" => "bar") }
let(:error) do
::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
429, double("url").as_null_object, double("request body"), double("response body")
429, double("url").as_null_object, request_body, double("response body")
)
end
let(:logger) { double("logger").as_null_object }
let(:response) { { :errors => [], :items => [] } }

let(:request_body) { double(:request_body, :bytesize => 1023) }

before(:each) do

i = 0
Expand Down Expand Up @@ -326,7 +328,7 @@
end

before(:each) do
allow(subject.client).to receive(:bulk_send).with(instance_of(StringIO)) do |stream|
allow(subject.client).to receive(:bulk_send).with(instance_of(StringIO), instance_of(Array)) do |stream, actions|
expect( stream.string ).to include '"foo":"bar1"'
expect( stream.string ).to include '"foo":"bar2"'
end.and_return(bulk_response, {"errors"=>false}) # let's make it go away (second call) to not retry indefinitely
Expand All @@ -350,6 +352,43 @@
end
end

context '413 errors' do
let(:payload_size) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES + 1024 }
let(:event) { ::LogStash::Event.new("message" => ("a" * payload_size ) ) }

let(:logger_stub) { double("logger").as_null_object }

before(:each) do
allow(elasticsearch_output_instance.client).to receive(:logger).and_return(logger_stub)

allow(elasticsearch_output_instance.client).to receive(:bulk).and_call_original

max_bytes = payload_size * 3 / 4 # ensure a failure first attempt
allow(elasticsearch_output_instance.client.pool).to receive(:post) do |path, params, body|
if body.length > max_bytes
max_bytes *= 2 # ensure a successful retry
double("Response", :code => 413, :body => "")
else
double("Response", :code => 200, :body => '{"errors":false,"items":[{"index":{"status":200,"result":"created"}}]}')
end
end
end

it 'retries the 413 until it goes away' do
elasticsearch_output_instance.multi_receive([event])

expect(elasticsearch_output_instance.client).to have_received(:bulk).twice
end

it 'logs about payload quantity and size' do
elasticsearch_output_instance.multi_receive([event])

expect(logger_stub).to have_received(:warn)
.with(a_string_matching(/413 Payload Too Large/),
hash_including(:action_count => 1, :content_length => a_value > 20_000_000))
end
end

context "with timeout set" do
let(:listener) { Flores::Random.tcp_listener }
let(:port) { listener[2] }
Expand Down