lib/logstash/codecs/identity_map_codec.rb in logstash-codec-multiline-3.0.10 vs lib/logstash/codecs/identity_map_codec.rb in logstash-codec-multiline-3.0.11
- old
+ new
@@ -1,9 +1,9 @@
# encoding: utf-8
require "logstash/namespace"
-require "thread_safe"
-require "concurrent"
+require "concurrent/atomic/atomic_boolean"
+require "concurrent/map"
# This class is a Codec duck type
# Using Composition, it maps from a stream identity to
# a cloned codec instance via the same API as a Codec
# it implements the codec public API
@@ -46,16 +46,19 @@
end
def start
return self if running?
@running.make_true
- @thread = Thread.new() do
+ @thread = Thread.start do
+ class_name = @listener.class.name.split('::').last # IdentityMapCodec
+ LogStash::Util.set_thread_name("#{class_name}##{@method_symbol}")
+
while running? do
sleep @interval
break if !running?
- break if @listener.nil?
- @listener.send(@method_symbol)
+ break if (listener = @listener).nil?
+ listener.send(@method_symbol)
end
end
self
end
@@ -64,13 +67,18 @@
end
def stop
return if !running?
@running.make_false
- if @thread.alive?
- @thread.wakeup
- @thread.join
+ while @thread.alive?
+ begin
+ @thread.wakeup
+ rescue ThreadError
+ # thread might drop dead since the alive? check
+ else
+ @thread.join(0.1) # raises $! if there was any
+ end
end
@listener = nil
end
end
@@ -107,10 +115,10 @@
attr_accessor :base_codec, :cleaner, :auto_flusher
def initialize(codec)
@base_codec = codec
@base_codecs = [codec]
- @identity_map = ThreadSafe::Hash.new &method(:codec_builder)
+ @identity_map = Concurrent::Hash.new &method(:codec_builder)
@max_identities = MAX_IDENTITIES
@evict_timeout = EVICT_TIMEOUT
cleaner_interval(CLEANER_INTERVAL)
if codec.respond_to?(:use_mapper_auto_flush) &&
(@auto_flush_interval = codec.use_mapper_auto_flush)