lib/logstash/codecs/identity_map_codec.rb in logstash-codec-multiline-3.0.10 vs lib/logstash/codecs/identity_map_codec.rb in logstash-codec-multiline-3.0.11

- old
+ new

@@ -1,9 +1,9 @@ # encoding: utf-8 require "logstash/namespace" -require "thread_safe" -require "concurrent" +require "concurrent/atomic/atomic_boolean" +require "concurrent/map" # This class is a Codec duck type # Using Composition, it maps from a stream identity to # a cloned codec instance via the same API as a Codec # it implements the codec public API @@ -46,16 +46,19 @@ end def start return self if running? @running.make_true - @thread = Thread.new() do + @thread = Thread.start do + class_name = @listener.class.name.split('::').last # IdentityMapCodec + LogStash::Util.set_thread_name("#{class_name}##{@method_symbol}") + while running? do sleep @interval break if !running? - break if @listener.nil? - @listener.send(@method_symbol) + break if (listener = @listener).nil? + listener.send(@method_symbol) end end self end @@ -64,13 +67,18 @@ end def stop return if !running? @running.make_false - if @thread.alive? - @thread.wakeup - @thread.join + while @thread.alive? + begin + @thread.wakeup + rescue ThreadError + # thread might drop dead since the alive? check + else + @thread.join(0.1) # raises $! if there was any + end end @listener = nil end end @@ -107,10 +115,10 @@ attr_accessor :base_codec, :cleaner, :auto_flusher def initialize(codec) @base_codec = codec @base_codecs = [codec] - @identity_map = ThreadSafe::Hash.new &method(:codec_builder) + @identity_map = Concurrent::Hash.new &method(:codec_builder) @max_identities = MAX_IDENTITIES @evict_timeout = EVICT_TIMEOUT cleaner_interval(CLEANER_INTERVAL) if codec.respond_to?(:use_mapper_auto_flush) && (@auto_flush_interval = codec.use_mapper_auto_flush)