Skip to content

Commit 9401759

Browse files
authored
Feat: assert returned item count from _bulk (#997)
we've seen some weird behavior on ES 7.10.2 (but also with 7.11.1) where a _bulk request to index 69 documents returned 135 entries this leads to an ugly `NoMethodError: undefined method ``[]' for nil:NilClass` resolves #989
1 parent 4a1f1a5 commit 9401759

File tree

4 files changed

+65
-2
lines changed

4 files changed

+65
-2
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 10.8.5
2+
- Feat: assert returned item count from _bulk [#997](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/997)
3+
14
## 10.8.4
25
- Fixed an issue where a retried request would drop "update" parameters [#800](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/800)
36

lib/logstash/plugin_mixins/elasticsearch/common.rb

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,8 +204,16 @@ def submit(actions)
204204
return
205205
end
206206

207+
responses = bulk_response["items"]
208+
if responses.size != actions.size # can not map action -> response reliably
209+
# an ES bug (on 7.10.2, 7.11.1) where a _bulk request to index X documents would return Y (> X) items
210+
msg = "Sent #{actions.size} documents but Elasticsearch returned #{responses.size} responses"
211+
@logger.warn(msg, actions: actions, responses: responses)
212+
fail("#{msg} (likely a bug with _bulk endpoint)")
213+
end
214+
207215
actions_to_retry = []
208-
bulk_response["items"].each_with_index do |response,idx|
216+
responses.each_with_index do |response,idx|
209217
action_type, action_props = response.first
210218

211219
status = action_props["status"]

logstash-output-elasticsearch.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-output-elasticsearch'
3-
s.version = '10.8.4'
3+
s.version = '10.8.5'
44

55
s.licenses = ['apache-2.0']
66
s.summary = "Stores logs in Elasticsearch"

spec/unit/outputs/elasticsearch_spec.rb

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,58 @@
296296
expect(subject.logger).to have_received(:debug).with(/Encountered a retryable error/i, anything)
297297
end
298298
end
299+
300+
context "unexpected bulk response" do
301+
let(:options) do
302+
{ "hosts" => "127.0.0.1:9999", "index" => "%{foo}", "manage_template" => false }
303+
end
304+
305+
let(:events) { [ ::LogStash::Event.new("foo" => "bar1"), ::LogStash::Event.new("foo" => "bar2") ] }
306+
307+
let(:bulk_response) do
308+
# shouldn't really happen but we've seen this happen - here ES returns more items than were sent
309+
{ "took"=>1, "ingest_took"=>9, "errors"=>true,
310+
"items"=>[{"index"=>{"_index"=>"bar1", "_type"=>"_doc", "_id"=>nil, "status"=>500,
311+
"error"=>{"type" => "illegal_state_exception",
312+
"reason" => "pipeline with id [test-ingest] could not be loaded, caused by [ElasticsearchParseException[Error updating pipeline with id [test-ingest]]; nested: ElasticsearchException[java.lang.IllegalArgumentException: no enrich index exists for policy with name [test-metadata1]]; nested: IllegalArgumentException[no enrich index exists for policy with name [test-metadata1]];; ElasticsearchException[java.lang.IllegalArgumentException: no enrich index exists for policy with name [test-metadata1]]; nested: IllegalArgumentException[no enrich index exists for policy with name [test-metadata1]];; java.lang.IllegalArgumentException: no enrich index exists for policy with name [test-metadata1]]"
313+
}
314+
}
315+
},
316+
# NOTE: this is an artificial success (usually everything fails with a 500) but even if some doc where
317+
# to succeed due the unexpected reponse items we can not clearly identify which actions to retry ...
318+
{"index"=>{"_index"=>"bar2", "_type"=>"_doc", "_id"=>nil, "status"=>201}},
319+
{"index"=>{"_index"=>"bar2", "_type"=>"_doc", "_id"=>nil, "status"=>500,
320+
"error"=>{"type" => "illegal_state_exception",
321+
"reason" => "pipeline with id [test-ingest] could not be loaded, caused by [ElasticsearchParseException[Error updating pipeline with id [test-ingest]]; nested: ElasticsearchException[java.lang.IllegalArgumentException: no enrich index exists for policy with name [test-metadata1]];"
322+
}
323+
}
324+
}]
325+
}
326+
end
327+
328+
before(:each) do
329+
allow(subject.client).to receive(:bulk_send).with(instance_of(StringIO)) do |stream|
330+
expect( stream.string ).to include '"foo":"bar1"'
331+
expect( stream.string ).to include '"foo":"bar2"'
332+
end.and_return(bulk_response, {"errors"=>false}) # let's make it go away (second call) to not retry indefinitely
333+
end
334+
335+
it "should retry submit" do
336+
allow(subject.logger).to receive(:error).with(/Encountered an unexpected error/i, anything)
337+
allow(subject.client).to receive(:bulk).and_call_original # track count
338+
339+
subject.multi_receive(events)
340+
341+
expect(subject.client).to have_received(:bulk).twice
342+
end
343+
344+
it "should log specific error message" do
345+
expect(subject.logger).to receive(:error).with(/Encountered an unexpected error/i,
346+
hash_including(:error_message => 'Sent 2 documents but Elasticsearch returned 3 responses (likely a bug with _bulk endpoint)'))
347+
348+
subject.multi_receive(events)
349+
end
350+
end
299351
end
300352

301353
context "with timeout set" do

0 commit comments

Comments
 (0)