Skip to content

Commit 3f1bf7b

Browse files
committed
fix: prevent sub-batch 413's from blocking whole batch
The Http Client breaks batches of actions into sub-batches that are up to 20MB in size, sending larger actions as batches-of-one, and zips the responses together to emulate a single batch response from the Elasticsearch API. When an individual HTTP request is rejected by Elasticsearch (or by an intermediate proxy or load-balancer) with an HTTP/1.1 413, we can emulate the error response instead of blowing an exception through to the whole batch.
1 parent b956d1c commit 3f1bf7b

File tree

2 files changed

+33
-12
lines changed

2 files changed

+33
-12
lines changed

lib/logstash/outputs/elasticsearch/common.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,7 @@ def safe_bulk(actions)
414414
retry unless @stopping.true?
415415
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
416416
@bulk_request_metrics.increment(:failures)
417-
log_hash = {:code => e.response_code, :url => e.url.sanitized.to_s}
417+
log_hash = {:code => e.response_code, :url => e.url.sanitized.to_s, :payload_size => e.request_body.bytesize}
418418
log_hash[:body] = e.response_body if @logger.debug? # Generally this is too verbose
419419
message = "Encountered a retryable error. Will Retry with exponential backoff "
420420

lib/logstash/outputs/elasticsearch/http_client.rb

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -114,18 +114,21 @@ def bulk(actions)
114114
stream_writer = body_stream
115115
end
116116
bulk_responses = []
117+
batch_actions = []
117118
bulk_actions.each do |action|
118119
as_json = action.is_a?(Array) ?
119120
action.map {|line| LogStash::Json.dump(line)}.join("\n") :
120121
LogStash::Json.dump(action)
121122
as_json << "\n"
122-
if (body_stream.size + as_json.bytesize) > TARGET_BULK_BYTES
123-
bulk_responses << bulk_send(body_stream) unless body_stream.size == 0
123+
if (body_stream.size + as_json.bytesize) > TARGET_BULK_BYTES && body_stream.size > 0
124+
bulk_responses << bulk_send(body_stream, batch_actions)
125+
batch_actions.clear
124126
end
125127
stream_writer.write(as_json)
128+
batch_actions << action
126129
end
127130
stream_writer.close if http_compression
128-
bulk_responses << bulk_send(body_stream) if body_stream.size > 0
131+
bulk_responses << bulk_send(body_stream, batch_actions) if body_stream.size > 0
129132
body_stream.close if !http_compression
130133
join_bulk_responses(bulk_responses)
131134
end
@@ -137,25 +140,43 @@ def join_bulk_responses(bulk_responses)
137140
}
138141
end
139142

140-
def bulk_send(body_stream)
143+
def bulk_send(body_stream, batch_actions)
144+
logger.trace("Sending bulk request to Elasticsearch", :action_count => batch_actions.size, :content_length => body_stream.size)
141145
params = http_compression ? {:headers => {"Content-Encoding" => "gzip"}} : {}
142-
# Discard the URL
143146
response = @pool.post(@bulk_path, params, body_stream.string)
144-
if !body_stream.closed?
145-
body_stream.truncate(0)
146-
body_stream.seek(0)
147-
end
148147

149148
@bulk_response_metrics.increment(response.code.to_s)
150149

151-
if response.code != 200
150+
case response.code
151+
when 200 # OK
152+
LogStash::Json.load(response.body)
153+
when 413 # Entity Too Large
154+
logger.warn("Received HTTP/1.1 status `413 Payload Too Large` sending #{batch_actions.size} actions (#{body_stream.size} bytes)")
155+
emulate_batch_error_response(actions, response.code, 'payload_too_large')
156+
else
152157
url = ::LogStash::Util::SafeURI.new(response.final_url)
153158
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
154159
response.code, url, body_stream.to_s, response.body
155160
)
156161
end
162+
ensure
163+
if !body_stream.closed?
164+
body_stream.truncate(0)
165+
body_stream.seek(0)
166+
end
167+
end
157168

158-
LogStash::Json.load(response.body)
169+
def emulate_batch_error_response(actions, http_code, reason)
170+
{
171+
"errors" => true,
172+
"items" => actions.map do |action|
173+
action = action.first if action.is_a?(Array)
174+
request_action, request_parameters = action.first
175+
{
176+
request_action => {"status" => http_code, "error" => { "type" => reason }}
177+
}
178+
end
179+
}
159180
end
160181

161182
def get(path)

0 commit comments

Comments
 (0)