diff --git a/lib/logstash/inputs/file.rb b/lib/logstash/inputs/file.rb index f1e7405..c87cc72 100644 --- a/lib/logstash/inputs/file.rb +++ b/lib/logstash/inputs/file.rb @@ -325,7 +325,7 @@ def register else @watcher_class = FileWatch::ObservingRead end - @codec = LogStash::Codecs::IdentityMapCodec.new(@codec) + @base_codec = @codec # TODO is there a way register would be called after run (on restart?) @completely_stopped = Concurrent::AtomicBoolean.new @queue = Concurrent::AtomicReference.new @@ -345,11 +345,12 @@ def listener_for(path) end def start_processing - # if the pipeline restarts this input, - # make sure previous files are closed - stop + # if the pipeline restarts this input, make sure previous files are closed + quit_watcher + @codec.close if @codec.is_a?(LogStash::Codecs::IdentityMapCodec) @watcher = @watcher_class.new(@filewatch_config) + @codec = LogStash::Codecs::IdentityMapCodec.new(@base_codec) @completed_file_handlers = [] if read_mode? @@ -386,7 +387,7 @@ def post_process_this(event, path) def handle_deletable_path(path) return if tail_mode? return if @completed_file_handlers.empty? - @logger.debug? && @logger.debug(__method__.to_s, :path => path) + @logger.trace? && @logger.trace(__method__.to_s, :path => path) @completed_file_handlers.each { |handler| handler.handle(path) } end @@ -394,11 +395,11 @@ def log_line_received(path, line) @logger.debug? && @logger.debug("Received line", :path => path, :text => line) end + # @override LogStash::Inputs::Base#stop def stop - unless @watcher.nil? - @codec.close - @watcher.quit - end + @logger.trace? && @logger.trace(__method__.to_s, @path) + quit_watcher + @codec.close if @codec end # @private used in specs @@ -408,6 +409,10 @@ def queue private + def quit_watcher + @watcher.quit if @watcher + end + def build_sincedb_base_from_settings(settings) logstash_data_path = settings.get_value("path.data") Pathname.new(logstash_data_path).join("plugins", "inputs", "file").tap do |path| @@ -423,7 +428,7 @@ def attempt_set(event, field_reference, value) event.set(field_reference, value) rescue => e - logger.trace("failed to set #{field_reference} to `#{value}`", :exception => e.message) + logger.trace("failed to set #{field_reference} to `#{value}`", :exception => e.class, :message => e.message) false end @@ -456,6 +461,7 @@ def read_mode? end def exit_flush + @logger.trace? && @logger.trace(__method__.to_s, @path) listener = FlushableListener.new("none", self) if @codec.identity_count.zero? # using the base codec without identity/path info @@ -463,7 +469,7 @@ def exit_flush begin listener.process_event(event) rescue => e - @logger.error("File Input: flush on exit downstream error", :exception => e) + @logger.error("flush on exit downstream error", :exception => e.class, :message => e.message) end end else diff --git a/logstash-input-file.gemspec b/logstash-input-file.gemspec index b5cf6c3..a478bf1 100644 --- a/logstash-input-file.gemspec +++ b/logstash-input-file.gemspec @@ -23,13 +23,7 @@ Gem::Specification.new do |s| s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" s.add_runtime_dependency 'logstash-codec-plain' - - if RUBY_VERSION.start_with?("1") - s.add_runtime_dependency 'rake', '~> 12.2.0' - s.add_runtime_dependency 'addressable', '~> 2.4.0' - else - s.add_runtime_dependency 'addressable' - end + s.add_runtime_dependency 'addressable' s.add_runtime_dependency 'concurrent-ruby', '~> 1.0' s.add_runtime_dependency 'logstash-codec-multiline', ['~> 3.0'] diff --git a/spec/inputs/file_tail_spec.rb b/spec/inputs/file_tail_spec.rb index fecb904..ca3905d 100644 --- a/spec/inputs/file_tail_spec.rb +++ b/spec/inputs/file_tail_spec.rb @@ -391,7 +391,7 @@ end end .then("assert both files are mapped as identities and stop") do - wait(2).for {subject.codec.identity_count}.to eq(2), "both files are not mapped as identities" + wait(2).for { subject.codec.identity_count }.to eq(2), "both files are not mapped as identities" end .then("stop") do subject.stop @@ -473,13 +473,13 @@ it "collects line events from only one file" do actions = RSpec::Sequencing .run("assert one identity is mapped") do - wait(0.4).for{subject.codec.identity_count}.to be > 0, "no identity is mapped" + wait(0.4).for{ subject.codec.respond_to?(:identity_count) ? subject.codec.identity_count : 0 }.to be > 0, "no identity is mapped" end .then("stop") do subject.stop end .then("stop flushes last event") do - wait(0.4).for{events.size}.to eq(2), "events size does not equal 2" + wait(0.4).for{ events.size }.to eq(2), "events size does not equal 2" end subject.run(events) # wait for actions future value @@ -508,7 +508,7 @@ it "collects line events from both files" do actions = RSpec::Sequencing .run("assert both identities are mapped and the first two events are built") do - wait(0.4).for{subject.codec.identity_count == 1 && events.size == 2}.to eq(true), "both identities are not mapped and the first two events are not built" + wait(0.4).for{ events.size == 2 && subject.codec.identity_count == 1 }.to eq(true), "both identities are not mapped and the first two events are not built" end .then("wait for close to flush last event of each identity") do wait(0.8).for{events.size}.to eq(4), "close does not flush last event of each identity"