lib/logstash/codecs/identity_map_codec.rb in logstash-codec-multiline-3.1.1 vs lib/logstash/codecs/identity_map_codec.rb in logstash-codec-multiline-3.1.2

- old
+ new

@@ -40,19 +40,24 @@ class PeriodicRunner def initialize(listener, interval, method_symbol) @listener, @interval = listener, interval @method_symbol = method_symbol - @running = Concurrent::AtomicBoolean.new(false) + @running = Concurrent::AtomicBoolean.new(true) end + # The goal of periodic runner is to have a single thread to do clean up / auto flush + # This method is expected to access by a single thread, + # otherwise, multiple Thread.start could create more than one periodic runner def start - return self if running? - @running.make_true + return self unless running? + return self if running? && !@thread.nil? + @thread = Thread.start do class_name = @listener.class.name.split('::').last # IdentityMapCodec LogStash::Util.set_thread_name("#{class_name}##{@method_symbol}") + @listener.logger.debug("Start periodic runner") while running? do sleep @interval break if !running? break if (listener = @listener).nil? @@ -65,13 +70,15 @@ def running? @running.true? end def stop - return if !running? + return unless running? + + @listener.logger.debug("Stop periodic runner") @running.make_false - while @thread.alive? + while @thread&.alive? begin @thread.wakeup rescue ThreadError # thread might drop dead since the alive? check else @@ -311,10 +318,10 @@ # for nil stream this method is not called def record_codec_usage(identity) check_map_limits # only start the cleaner if streams are in use - # continuous calls to start are OK + # continuous calls to start are OK if this codec method is accessed by a single thread cleaner.start auto_flusher.start compo = find_codec_value(identity) now = Time.now compo.eviction_timeout = eviction_timestamp(now)