Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 3.1.1
- Fix: avoid reusing per-identity codec instances for differing identities. Removes a very minor optimization so that stateful codecs like CSV can work reliably [#70](https://github.com/logstash-plugins/logstash-codec-multiline/pull/70)

## 3.1.0
- Feat: ECS compatibility [#69](https://github.com/logstash-plugins/logstash-codec-multiline/pull/69)

Expand Down
3 changes: 2 additions & 1 deletion lib/logstash/codecs/identity_map_codec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ def logger
end

def codec_without_usage_update(identity)
return base_codec if identity.nil? # mirror optimization in `stream_codec`
find_codec_value(identity).codec
end

Expand Down Expand Up @@ -336,7 +337,7 @@ def check_map_limits
end

def codec_builder(hash, k)
codec = hash.empty? ? @base_codec : @base_codec.clone
codec = @base_codec.clone
codec.use_mapper_auto_flush if using_mapped_auto_flush?
compo = CodecValue.new(codec).tap do |o|
now = Time.now
Expand Down
2 changes: 1 addition & 1 deletion logstash-codec-multiline.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-codec-multiline'
s.version = '3.1.0'
s.version = '3.1.1'
s.licenses = ['Apache License (2.0)']
s.summary = "Merges multiline messages into a single event"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
23 changes: 16 additions & 7 deletions spec/codecs/identity_map_codec_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,17 @@
let(:stream1) { nil }

it "transparently refers to the original codec" do
expect(codec).to eql(codec1)
resolved_codec = demuxer.codec_without_usage_update(stream1)
expect(resolved_codec).to eq(codec)
end
end

describe "operating with stream identity" do

before { demuxer.decode(arg1, stream1) }

it "the first identity refers to the original codec" do
expect(codec).to eql(codec1)
it "the first identity refers to a copy of original codec" do
expect(codec1).to_not eql(codec)
end
end

Expand Down Expand Up @@ -212,8 +213,11 @@
let(:demuxer) { described_class.new(codec).evict_timeout(1).cleaner_interval(1) }
it "the cleaner evicts the codec and flushes it first" do
demuxer.decode(Object.new, "stream1"){|*| 42}
mapped_codec = demuxer.codec_without_usage_update("stream1") # get before eviction

sleep(2.1)
expect(codec.trace_for(:flush)).to eq(42)

expect(mapped_codec.trace_for(:flush)).to eq(42)
expect(demuxer.identity_map.keys).not_to include("stream1")
end
end
Expand All @@ -227,9 +231,12 @@
end
it "the cleaner evicts the codec and flushes it first using the eviction_block" do
demuxer.decode(Object.new, "stream1"){|*| 42}
mapped_codec = demuxer.codec_without_usage_update("stream1") # get before eviction

sleep(2.1)
expect(codec.trace_for(:flush)).to eq(24)
expect(demuxer.identity_map.keys).not_to include("stream1")

expect(mapped_codec.trace_for(:flush)).to eq(24)
expect(demuxer.identity_map.keys).to_not include("stream1")
end
end
end
Expand All @@ -251,7 +258,9 @@
it "no events are generated (the line is buffered)" do
expect(imc.identity_count).to eq(1)
expect(queue.size).to eq(0)
expect(mlc.internal_buffer[0]).to eq("foo")

mapped_codec = imc.codec_without_usage_update(identity)
expect(mapped_codec.internal_buffer[0]).to eq("foo")
end
end

Expand Down