lib/logstash/codecs/identity_map_codec.rb in logstash-codec-multiline-3.1.1 vs lib/logstash/codecs/identity_map_codec.rb in logstash-codec-multiline-3.1.2
- old
+ new
@@ -40,19 +40,24 @@
class PeriodicRunner
def initialize(listener, interval, method_symbol)
@listener, @interval = listener, interval
@method_symbol = method_symbol
- @running = Concurrent::AtomicBoolean.new(false)
+ @running = Concurrent::AtomicBoolean.new(true)
end
+ # The goal of periodic runner is to have a single thread to do clean up / auto flush
+ # This method is expected to access by a single thread,
+ # otherwise, multiple Thread.start could create more than one periodic runner
def start
- return self if running?
- @running.make_true
+ return self unless running?
+ return self if running? && !@thread.nil?
+
@thread = Thread.start do
class_name = @listener.class.name.split('::').last # IdentityMapCodec
LogStash::Util.set_thread_name("#{class_name}##{@method_symbol}")
+ @listener.logger.debug("Start periodic runner")
while running? do
sleep @interval
break if !running?
break if (listener = @listener).nil?
@@ -65,13 +70,15 @@
def running?
@running.true?
end
def stop
- return if !running?
+ return unless running?
+
+ @listener.logger.debug("Stop periodic runner")
@running.make_false
- while @thread.alive?
+ while @thread&.alive?
begin
@thread.wakeup
rescue ThreadError
# thread might drop dead since the alive? check
else
@@ -311,10 +318,10 @@
# for nil stream this method is not called
def record_codec_usage(identity)
check_map_limits
# only start the cleaner if streams are in use
- # continuous calls to start are OK
+ # continuous calls to start are OK if this codec method is accessed by a single thread
cleaner.start
auto_flusher.start
compo = find_codec_value(identity)
now = Time.now
compo.eviction_timeout = eviction_timestamp(now)