From c4c088209b2581b14665b1fa8912c3a77ca6a992 Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Tue, 28 Sep 2021 00:01:15 +0000 Subject: [PATCH 1/2] eliminate re-use of codecs across identities Some Logstash codecs (like CSV) are stateful and cannot be reused across multiple input streams. This change removes a minor optimization, in which the base codec is repeatedly reused for the first concurrent accessor, despite the identities for the identity map not matching previous usages. Instead, when an identity is provided, we ensure that a _clone_ of the base codec is provided for each distinct identity up and until eviction, and that the clone is not reused after its eviction flush. --- CHANGELOG.md | 3 +++ lib/logstash/codecs/identity_map_codec.rb | 3 ++- logstash-codec-multiline.gemspec | 2 +- spec/codecs/identity_map_codec_spec.rb | 23 ++++++++++++++++------- 4 files changed, 22 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 886c809..d67770c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 3.1.1 + - Fix: avoid reusing per-identity codec instances for differing identities. Removes a very minor optimization so that stateful codecs like CSV can work reliably. + ## 3.1.0 - Feat: ECS compatibility [#69](https://github.com/logstash-plugins/logstash-codec-multiline/pull/69) diff --git a/lib/logstash/codecs/identity_map_codec.rb b/lib/logstash/codecs/identity_map_codec.rb index 62fc2d1..fbe694e 100644 --- a/lib/logstash/codecs/identity_map_codec.rb +++ b/lib/logstash/codecs/identity_map_codec.rb @@ -290,6 +290,7 @@ def logger end def codec_without_usage_update(identity) + return base_codec if identity.nil? # mirror optimization in `stream_codec` find_codec_value(identity).codec end @@ -336,7 +337,7 @@ def check_map_limits end def codec_builder(hash, k) - codec = hash.empty? ? @base_codec : @base_codec.clone + codec = @base_codec.clone codec.use_mapper_auto_flush if using_mapped_auto_flush? compo = CodecValue.new(codec).tap do |o| now = Time.now diff --git a/logstash-codec-multiline.gemspec b/logstash-codec-multiline.gemspec index 3bc6dd9..37755d4 100644 --- a/logstash-codec-multiline.gemspec +++ b/logstash-codec-multiline.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-codec-multiline' - s.version = '3.1.0' + s.version = '3.1.1' s.licenses = ['Apache License (2.0)'] s.summary = "Merges multiline messages into a single event" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/codecs/identity_map_codec_spec.rb b/spec/codecs/identity_map_codec_spec.rb index e02b484..33d1bda 100644 --- a/spec/codecs/identity_map_codec_spec.rb +++ b/spec/codecs/identity_map_codec_spec.rb @@ -29,7 +29,8 @@ let(:stream1) { nil } it "transparently refers to the original codec" do - expect(codec).to eql(codec1) + resolved_codec = demuxer.codec_without_usage_update(stream1) + expect(resolved_codec).to eq(codec) end end @@ -37,8 +38,8 @@ before { demuxer.decode(arg1, stream1) } - it "the first identity refers to the original codec" do - expect(codec).to eql(codec1) + it "the first identity refers to a copy of original codec" do + expect(codec1).to_not eql(codec) end end @@ -212,8 +213,11 @@ let(:demuxer) { described_class.new(codec).evict_timeout(1).cleaner_interval(1) } it "the cleaner evicts the codec and flushes it first" do demuxer.decode(Object.new, "stream1"){|*| 42} + mapped_codec = demuxer.codec_without_usage_update("stream1") # get before eviction + sleep(2.1) - expect(codec.trace_for(:flush)).to eq(42) + + expect(mapped_codec.trace_for(:flush)).to eq(42) expect(demuxer.identity_map.keys).not_to include("stream1") end end @@ -227,9 +231,12 @@ end it "the cleaner evicts the codec and flushes it first using the eviction_block" do demuxer.decode(Object.new, "stream1"){|*| 42} + mapped_codec = demuxer.codec_without_usage_update("stream1") # get before eviction + sleep(2.1) - expect(codec.trace_for(:flush)).to eq(24) - expect(demuxer.identity_map.keys).not_to include("stream1") + + expect(mapped_codec.trace_for(:flush)).to eq(24) + expect(demuxer.identity_map.keys).to_not include("stream1") end end end @@ -251,7 +258,9 @@ it "no events are generated (the line is buffered)" do expect(imc.identity_count).to eq(1) expect(queue.size).to eq(0) - expect(mlc.internal_buffer[0]).to eq("foo") + + mapped_codec = imc.codec_without_usage_update(identity) + expect(mapped_codec.internal_buffer[0]).to eq("foo") end end From 2a40c2165ffb0570f1c0fbdc90946e02254f18ac Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Tue, 28 Sep 2021 19:55:47 +0000 Subject: [PATCH 2/2] add link to PR in changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d67770c..efc6f1a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,5 @@ ## 3.1.1 - - Fix: avoid reusing per-identity codec instances for differing identities. Removes a very minor optimization so that stateful codecs like CSV can work reliably. + - Fix: avoid reusing per-identity codec instances for differing identities. Removes a very minor optimization so that stateful codecs like CSV can work reliably [#70](https://github.com/logstash-plugins/logstash-codec-multiline/pull/70) ## 3.1.0 - Feat: ECS compatibility [#69](https://github.com/logstash-plugins/logstash-codec-multiline/pull/69)