Skip to content

Commit 1891b34

Browse files
authored
Fix failing tests (#309)
There are two failures that this PR fixes: - tests that referred host or path attribute running against Logstash 8, which by default has ECS enabled. The fix consisted in surrounding the test with the ECS harness and use the ecs_select to refer attributes with proper name. - another fix regards the test having timed_out, the codec is auto flushed. It presented 2 problems: - the first is that to request the subject.stop to trigger the auto_flush call (called by exit_flush method). - the second regard a regression introduced with eliminate PR logstash-plugins/logstash-codec-multiline#70. The test worked because the the IdentityMapCodec used a shared codec. When this behavior was removed the test, that engage multiple threads, exposed a problem of overwriting of codec reference. This overwriting drove to the lost of data, used in test logic. The fix consists in clone redefinition to make it almost a singleton.
1 parent 1799701 commit 1891b34

File tree

2 files changed

+174
-138
lines changed

2 files changed

+174
-138
lines changed

spec/helpers/spec_helper.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def close
5757
@tracer.push [:close, true]
5858
end
5959
def clone
60-
self.class.new
60+
self
6161
end
6262
end
6363
end

spec/inputs/file_tail_spec.rb

Lines changed: 173 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -157,39 +157,49 @@
157157
end
158158
end
159159

160-
context "running the input twice" do
161-
let(:name) { "D" }
162-
it "should read old files" do
163-
conf = <<-CONFIG
164-
input {
165-
file {
166-
type => "blah"
167-
path => "#{path_path}"
168-
start_position => "beginning"
169-
codec => "json"
170-
}
171-
}
172-
CONFIG
160+
context "running the input twice", :ecs_compatibility_support do
161+
ecs_compatibility_matrix(:disabled, :v1, :v8 => :v1) do |ecs_select|
173162

174-
File.open(tmpfile_path, "w") do |fd|
175-
fd.puts('{"path": "my_path", "host": "my_host"}')
176-
fd.puts('{"my_field": "my_val"}')
177-
fd.fsync
163+
before(:each) do
164+
allow_any_instance_of(described_class).to receive(:ecs_compatibility).and_return(ecs_compatibility)
178165
end
179-
# arbitrary old file (2 days)
180-
FileInput.make_file_older(tmpfile_path, 48 * 60 * 60)
166+
167+
let(:file_path_target_field ) { ecs_select[disabled: "path", v1: '[log][file][path]'] }
168+
let(:source_host_target_field) { ecs_select[disabled: "host", v1: '[host][name]'] }
169+
170+
let(:name) { "D" }
171+
it "should read old files" do
172+
conf = <<-CONFIG
173+
input {
174+
file {
175+
type => "blah"
176+
path => "#{path_path}"
177+
start_position => "beginning"
178+
codec => "json"
179+
}
180+
}
181+
CONFIG
181182

182-
events = input(conf) do |pipeline, queue|
183-
2.times.collect { queue.pop }
183+
File.open(tmpfile_path, "w") do |fd|
184+
fd.puts('{"path": "my_path", "host": "my_host"}')
185+
fd.puts('{"my_field": "my_val"}')
186+
fd.fsync
187+
end
188+
# arbitrary old file (2 days)
189+
FileInput.make_file_older(tmpfile_path, 48 * 60 * 60)
190+
191+
events = input(conf) do |pipeline, queue|
192+
2.times.collect { queue.pop }
193+
end
194+
existing_path_index, added_path_index = "my_val" == events[0].get("my_field") ? [1,0] : [0,1]
195+
expect(events[existing_path_index].get("path")).to eq "my_path"
196+
expect(events[existing_path_index].get("host")).to eq "my_host"
197+
expect(events[existing_path_index].get("[@metadata][host]")).to eq "#{Socket.gethostname.force_encoding(Encoding::UTF_8)}"
198+
199+
expect(events[added_path_index].get(file_path_target_field)).to eq "#{tmpfile_path}"
200+
expect(events[added_path_index].get(source_host_target_field)).to eq "#{Socket.gethostname.force_encoding(Encoding::UTF_8)}"
201+
expect(events[added_path_index].get("[@metadata][host]")).to eq "#{Socket.gethostname.force_encoding(Encoding::UTF_8)}"
184202
end
185-
existing_path_index, added_path_index = "my_val" == events[0].get("my_field") ? [1,0] : [0,1]
186-
expect(events[existing_path_index].get("path")).to eq "my_path"
187-
expect(events[existing_path_index].get("host")).to eq "my_host"
188-
expect(events[existing_path_index].get("[@metadata][host]")).to eq "#{Socket.gethostname.force_encoding(Encoding::UTF_8)}"
189-
190-
expect(events[added_path_index].get("path")).to eq "#{tmpfile_path}"
191-
expect(events[added_path_index].get("host")).to eq "#{Socket.gethostname.force_encoding(Encoding::UTF_8)}"
192-
expect(events[added_path_index].get("[@metadata][host]")).to eq "#{Socket.gethostname.force_encoding(Encoding::UTF_8)}"
193203
end
194204
end
195205

@@ -233,54 +243,62 @@
233243
FileUtils.rm_rf(sincedb_path)
234244
end
235245

236-
context "when data exists and then more data is appended" do
237-
subject { described_class.new(conf) }
246+
context "when data exists and then more data is appended", :ecs_compatibility_support do
247+
ecs_compatibility_matrix(:disabled, :v1, :v8 => :v1) do |ecs_select|
238248

239-
before do
240-
File.open(tmpfile_path, "w") do |fd|
241-
fd.puts("ignore me 1")
242-
fd.puts("ignore me 2")
243-
fd.fsync
249+
before(:each) do
250+
allow_any_instance_of(described_class).to receive(:ecs_compatibility).and_return(ecs_compatibility)
244251
end
245-
mlconf.update("pattern" => "^\s", "what" => "previous")
246-
conf.update("type" => "blah",
247-
"path" => path_path,
248-
"sincedb_path" => sincedb_path,
249-
"stat_interval" => 0.1,
250-
"codec" => mlcodec,
251-
"delimiter" => TEST_FILE_DELIMITER)
252-
end
253252

254-
it "reads the appended data only" do
255-
subject.register
256-
actions = RSpec::Sequencing
257-
.run_after(1, "append two lines after delay") do
258-
File.open(tmpfile_path, "a") { |fd| fd.puts("hello"); fd.puts("world") }
259-
end
260-
.then("wait for one event") do
261-
wait(0.75).for{events.size}.to eq(1)
262-
end
263-
.then("quit") do
264-
subject.stop
265-
end
266-
.then("wait for flushed event") do
267-
wait(0.75).for{events.size}.to eq(2)
253+
let(:file_path_target_field ) { ecs_select[disabled: "path", v1: '[log][file][path]'] }
254+
subject { described_class.new(conf) }
255+
256+
before do
257+
File.open(tmpfile_path, "w") do |fd|
258+
fd.puts("ignore me 1")
259+
fd.puts("ignore me 2")
260+
fd.fsync
268261
end
262+
mlconf.update("pattern" => "^\s", "what" => "previous")
263+
conf.update("type" => "blah",
264+
"path" => path_path,
265+
"sincedb_path" => sincedb_path,
266+
"stat_interval" => 0.1,
267+
"codec" => mlcodec,
268+
"delimiter" => TEST_FILE_DELIMITER)
269+
end
269270

270-
subject.run(events)
271-
actions.assert_no_errors
271+
it "reads the appended data only" do
272+
subject.register
273+
actions = RSpec::Sequencing
274+
.run_after(1, "append two lines after delay") do
275+
File.open(tmpfile_path, "a") { |fd| fd.puts("hello"); fd.puts("world") }
276+
end
277+
.then("wait for one event") do
278+
wait(0.75).for{events.size}.to eq(1)
279+
end
280+
.then("quit") do
281+
subject.stop
282+
end
283+
.then("wait for flushed event") do
284+
wait(0.75).for{events.size}.to eq(2)
285+
end
272286

273-
event1 = events[0]
274-
expect(event1).not_to be_nil
275-
expect(event1.get("path")).to eq tmpfile_path
276-
expect(event1.get("[@metadata][path]")).to eq tmpfile_path
277-
expect(event1.get("message")).to eq "hello"
278-
279-
event2 = events[1]
280-
expect(event2).not_to be_nil
281-
expect(event2.get("path")).to eq tmpfile_path
282-
expect(event2.get("[@metadata][path]")).to eq tmpfile_path
283-
expect(event2.get("message")).to eq "world"
287+
subject.run(events)
288+
actions.assert_no_errors
289+
290+
event1 = events[0]
291+
expect(event1).not_to be_nil
292+
expect(event1.get(file_path_target_field)).to eq tmpfile_path
293+
expect(event1.get("[@metadata][path]")).to eq tmpfile_path
294+
expect(event1.get("message")).to eq "hello"
295+
296+
event2 = events[1]
297+
expect(event2).not_to be_nil
298+
expect(event2.get(file_path_target_field)).to eq tmpfile_path
299+
expect(event2.get("[@metadata][path]")).to eq tmpfile_path
300+
expect(event2.get("message")).to eq "world"
301+
end
284302
end
285303
end
286304

@@ -311,12 +329,21 @@
311329
.then_after(0.1, "identity is mapped") do
312330
wait(0.75).for{subject.codec.identity_map[tmpfile_path]}.not_to be_nil, "identity is not mapped"
313331
end
314-
.then("wait for auto_flush") do
315-
wait(0.75).for{subject.codec.identity_map[tmpfile_path].codec.trace_for(:auto_flush)}.to eq([true]), "autoflush didn't"
332+
.then("wait accept") do
333+
wait(0.75).for {
334+
subject.codec.identity_map[tmpfile_path].codec.trace_for(:accept)
335+
}.to eq([true]), "accept didn't"
316336
end
317-
.then("quit") do
337+
.then("request a stop") do
338+
# without this the subject.run doesn't invokes the #exit_flush which is the only @codec.flush_mapped invocation
318339
subject.stop
319340
end
341+
.then("wait for auto_flush") do
342+
wait(2).for {
343+
subject.codec.identity_map[tmpfile_path].codec.trace_for(:auto_flush)
344+
.reduce {|b1, b2| b1 and b2} # there could be multiple instances of same call, e.g. [[:accept, true], [:auto_flush, true], [:close, true], [:auto_flush, true]]
345+
}.to eq(true), "autoflush didn't"
346+
end
320347
subject.run(events)
321348
actions.assert_no_errors
322349
expect(subject.codec.identity_map[tmpfile_path].codec.trace_for(:accept)).to eq([true])
@@ -356,85 +383,94 @@
356383
end
357384
end
358385

359-
context "when wildcard path and a multiline codec is specified" do
360-
subject { described_class.new(conf) }
361-
let(:suffix) { "J" }
362-
let(:tmpfile_path2) { ::File.join(tmpdir_path, "K.txt") }
363-
before do
364-
mlconf.update("pattern" => "^\s", "what" => "previous")
365-
conf.update(
366-
"type" => "blah",
367-
"path" => path_path,
368-
"start_position" => "beginning",
369-
"sincedb_path" => sincedb_path,
370-
"stat_interval" => 0.05,
371-
"codec" => mlcodec,
372-
"file_sort_by" => "path",
373-
"delimiter" => TEST_FILE_DELIMITER)
386+
context "when wildcard path and a multiline codec is specified", :ecs_compatibility_support do
387+
ecs_compatibility_matrix(:disabled, :v1, :v8 => :v1) do |ecs_select|
374388

375-
subject.register
376-
end
389+
before(:each) do
390+
allow_any_instance_of(described_class).to receive(:ecs_compatibility).and_return(ecs_compatibility)
391+
end
377392

378-
it "collects separate multiple line events from each file" do
379-
subject
380-
actions = RSpec::Sequencing
381-
.run_after(0.1, "create files") do
382-
File.open(tmpfile_path, "wb") do |fd|
383-
fd.puts("line1.1-of-J")
384-
fd.puts(" line1.2-of-J")
385-
fd.puts(" line1.3-of-J")
386-
end
387-
File.open(tmpfile_path2, "wb") do |fd|
388-
fd.puts("line1.1-of-K")
389-
fd.puts(" line1.2-of-K")
390-
fd.puts(" line1.3-of-K")
391-
end
392-
end
393-
.then("assert both files are mapped as identities and stop") do
394-
wait(2).for {subject.codec.identity_count}.to eq(2), "both files are not mapped as identities"
395-
end
396-
.then("stop") do
397-
subject.stop
398-
end
399-
subject.run(events)
400-
# wait for actions to complete
401-
actions.assert_no_errors
402-
expect(events.size).to eq(2)
403-
e1, e2 = events
404-
e1_message = e1.get("message")
405-
e2_message = e2.get("message")
406-
407-
expect(e1.get("path")).to match(/J.txt/)
408-
expect(e2.get("path")).to match(/K.txt/)
409-
expect(e1_message).to eq("line1.1-of-J#{TEST_FILE_DELIMITER} line1.2-of-J#{TEST_FILE_DELIMITER} line1.3-of-J")
410-
expect(e2_message).to eq("line1.1-of-K#{TEST_FILE_DELIMITER} line1.2-of-K#{TEST_FILE_DELIMITER} line1.3-of-K")
411-
end
393+
let(:file_path_target_field ) { ecs_select[disabled: "path", v1: '[log][file][path]'] }
412394

413-
context "if auto_flush is enabled on the multiline codec" do
414-
let(:mlconf) { { "auto_flush_interval" => 0.5 } }
415-
let(:suffix) { "M" }
416-
it "an event is generated via auto_flush" do
395+
subject { described_class.new(conf) }
396+
let(:suffix) { "J" }
397+
let(:tmpfile_path2) { ::File.join(tmpdir_path, "K.txt") }
398+
before do
399+
mlconf.update("pattern" => "^\s", "what" => "previous")
400+
conf.update(
401+
"type" => "blah",
402+
"path" => path_path,
403+
"start_position" => "beginning",
404+
"sincedb_path" => sincedb_path,
405+
"stat_interval" => 0.05,
406+
"codec" => mlcodec,
407+
"file_sort_by" => "path",
408+
"delimiter" => TEST_FILE_DELIMITER)
409+
410+
subject.register
411+
end
412+
413+
it "collects separate multiple line events from each file" do
414+
subject
417415
actions = RSpec::Sequencing
418416
.run_after(0.1, "create files") do
419417
File.open(tmpfile_path, "wb") do |fd|
420-
fd.puts("line1.1-of-a")
421-
fd.puts(" line1.2-of-a")
422-
fd.puts(" line1.3-of-a")
418+
fd.puts("line1.1-of-J")
419+
fd.puts(" line1.2-of-J")
420+
fd.puts(" line1.3-of-J")
421+
end
422+
File.open(tmpfile_path2, "wb") do |fd|
423+
fd.puts("line1.1-of-K")
424+
fd.puts(" line1.2-of-K")
425+
fd.puts(" line1.3-of-K")
423426
end
424427
end
425-
.then("wait for auto_flush") do
426-
wait(2).for{events.size}.to eq(1), "events size is not 1"
428+
.then("assert both files are mapped as identities and stop") do
429+
wait(2).for {subject.codec.identity_count}.to eq(2), "both files are not mapped as identities"
427430
end
428431
.then("stop") do
429432
subject.stop
430433
end
431434
subject.run(events)
432435
# wait for actions to complete
433436
actions.assert_no_errors
434-
e1 = events.first
437+
expect(events.size).to eq(2)
438+
e1, e2 = events
435439
e1_message = e1.get("message")
436-
expect(e1_message).to eq("line1.1-of-a#{TEST_FILE_DELIMITER} line1.2-of-a#{TEST_FILE_DELIMITER} line1.3-of-a")
437-
expect(e1.get("path")).to match(/M.txt$/)
440+
e2_message = e2.get("message")
441+
442+
expect(e1.get(file_path_target_field)).to match(/J.txt/)
443+
expect(e2.get(file_path_target_field)).to match(/K.txt/)
444+
expect(e1_message).to eq("line1.1-of-J#{TEST_FILE_DELIMITER} line1.2-of-J#{TEST_FILE_DELIMITER} line1.3-of-J")
445+
expect(e2_message).to eq("line1.1-of-K#{TEST_FILE_DELIMITER} line1.2-of-K#{TEST_FILE_DELIMITER} line1.3-of-K")
446+
end
447+
448+
context "if auto_flush is enabled on the multiline codec" do
449+
let(:mlconf) { { "auto_flush_interval" => 0.5 } }
450+
let(:suffix) { "M" }
451+
it "an event is generated via auto_flush" do
452+
actions = RSpec::Sequencing
453+
.run_after(0.1, "create files") do
454+
File.open(tmpfile_path, "wb") do |fd|
455+
fd.puts("line1.1-of-a")
456+
fd.puts(" line1.2-of-a")
457+
fd.puts(" line1.3-of-a")
458+
end
459+
end
460+
.then("wait for auto_flush") do
461+
wait(2).for{events.size}.to eq(1), "events size is not 1"
462+
end
463+
.then("stop") do
464+
subject.stop
465+
end
466+
subject.run(events)
467+
# wait for actions to complete
468+
actions.assert_no_errors
469+
e1 = events.first
470+
e1_message = e1.get("message")
471+
expect(e1_message).to eq("line1.1-of-a#{TEST_FILE_DELIMITER} line1.2-of-a#{TEST_FILE_DELIMITER} line1.3-of-a")
472+
expect(e1.get(file_path_target_field)).to match(/M.txt$/)
473+
end
438474
end
439475
end
440476
end

0 commit comments

Comments
 (0)