lib/logstash/codecs/identity_map_codec.rb in logstash-codec-multiline-2.0.7 vs lib/logstash/codecs/identity_map_codec.rb in logstash-codec-multiline-2.0.8

- old
+ new

@@ -1,8 +1,9 @@ # encoding: utf-8 require "logstash/namespace" require "thread_safe" +require "concurrent" # This class is a Codec duck type # Using Composition, it maps from a stream identity to # a cloned codec instance via the same API as a Codec # it implements the codec public API @@ -35,45 +36,55 @@ :current_size => current_size, :upper_limit => limit) raise IdentityMapUpperLimitException.new end end - class MapCleaner - def initialize(imc, interval) - @imc, @interval = imc, interval - @running = false + class PeriodicRunner + def initialize(listener, interval, method_symbol) + @listener, @interval = listener, interval + @method_symbol = method_symbol + @running = Concurrent::AtomicBoolean.new(false) end def start return self if running? - @running = true - @thread = Thread.new(@imc) do |imc| - loop do + @running.make_true + @thread = Thread.new() do + while running? do sleep @interval - break if !@running - imc.map_cleanup + break if !running? + @listener.send(@method_symbol) end end self end def running? - @running + @running.value end def stop return if !running? - @running = false - @thread.wakeup if @thread.alive? + @running.make_false + if @thread.alive? + @thread.wakeup + @thread.join + end + @listener = nil end end - # A composite class to hold both the codec and the eviction_timeout + class NoopRunner + attr_reader :start, :stop + def running?() false; end + end + + # A composite class to hold both the codec, the eviction_timeout and a last_used timestamp # instances of this Value Object are stored in the mapping hash class CodecValue attr_reader :codec - attr_accessor :timeout + attr_accessor :eviction_timeout, :auto_flush_timeout def initialize(codec) @codec = codec end end @@ -90,19 +101,26 @@ # time that the cleaner thread sleeps for # before it tries to clean out stale mappings CLEANER_INTERVAL = 60 * 5 # 5 minutes attr_reader :identity_map - attr_accessor :base_codec, :cleaner + attr_accessor :base_codec, :cleaner, :auto_flusher def initialize(codec) @base_codec = codec @base_codecs = [codec] @identity_map = ThreadSafe::Hash.new &method(:codec_builder) @max_identities = MAX_IDENTITIES @evict_timeout = EVICT_TIMEOUT - @cleaner = MapCleaner.new(self, CLEANER_INTERVAL) + cleaner_interval(CLEANER_INTERVAL) + if codec.respond_to?(:use_mapper_auto_flush) && + (@auto_flush_interval = codec.use_mapper_auto_flush) + @auto_flusher = PeriodicRunner.new(self, 0.5, :auto_flush_mapped) + else + @auto_flusher = NoopRunner.new + end + @decode_block = lambda {|*| true } @eviction_block = nil end # ============================================== @@ -121,20 +139,21 @@ self end # used to add a non-default cleaner interval def cleaner_interval(interval) - @cleaner.stop - @cleaner = MapCleaner.new(self, interval.to_i) + @cleaner.stop if @cleaner + @cleaner = PeriodicRunner.new(self, interval.to_i, :map_cleanup) self end # used to add a non-default eviction block def eviction_block(block) @eviction_block = block self end + # end Constructional/builder methods # ============================================== # ============================================== # IdentityMapCodec API @@ -180,15 +199,38 @@ end end def close() cleaner.stop + auto_flusher.stop all_codecs.each(&:close) end # end Codec API # ============================================== + def auto_flush_mapped + if !identity_count.zero? + nowf = Time.now.to_f + identity_map.each do |identity, compo| + next if compo.auto_flush_timeout.zero? + next unless nowf > compo.auto_flush_timeout + compo.codec.auto_flush + # at eof (tail and read) no more lines for a while or ever + # so reset compo.auto_flush_timeout + compo.auto_flush_timeout = 0 + end + end + end + + def flush_mapped(listener) + listener_has_path = listener.respond_to?(:path) + identity_map.each do |identity, compo| + listener.path = identity if listener_has_path + compo.codec.auto_flush(listener) + end + end + def all_codecs no_streams? ? @base_codecs : identity_map.values.map(&:codec) end def max_limit @@ -202,18 +244,20 @@ # support cleaning of stale stream/codecs # a stream is considered stale if it has not # been accessed in the last @evict_timeout # period (default 1 hour) def map_cleanup - cut_off = Time.now.to_i - # delete_if is atomic - # contents should not mutate during this call - identity_map.delete_if do |identity, compo| - if (flag = compo.timeout <= cut_off) - evict_flush(compo.codec) + if !identity_count.zero? + nowi = Time.now.to_i + # delete_if is atomic + # contents should not mutate during this call + identity_map.delete_if do |identity, compo| + if (flag = compo.eviction_timeout <= nowi) + evict_flush(compo.codec) + end + flag end - flag end current_size_and_limit end def evict_flush(codec) @@ -239,11 +283,11 @@ def codec_without_usage_update(identity) find_codec_value(identity).codec end def eviction_timestamp_for(identity) - find_codec_value(identity).timeout + find_codec_value(identity).eviction_timeout end private def stream_codec(identity) @@ -259,29 +303,41 @@ def record_codec_usage(identity) check_map_limits # only start the cleaner if streams are in use # continuous calls to start are OK cleaner.start + auto_flusher.start compo = find_codec_value(identity) - compo.timeout = eviction_timestamp + now = Time.now + compo.eviction_timeout = eviction_timestamp(now) + compo.auto_flush_timeout = auto_flush_timestamp(now) compo.codec end - def eviction_timestamp - Time.now.to_i + @evict_timeout + def auto_flush_timestamp(now = Time.now) + now.to_f + @auto_flush_interval.to_f end + def eviction_timestamp(now = Time.now) + now.to_i + @evict_timeout + end + def check_map_limits UpperLimitReached.visit(self) EightyPercentWarning.visit(self) end def codec_builder(hash, k) codec = hash.empty? ? @base_codec : @base_codec.clone + codec.use_mapper_auto_flush if using_mapped_auto_flush? compo = CodecValue.new(codec) hash.store(k, compo) end def no_streams? identity_map.empty? + end + + def using_mapped_auto_flush? + !@auto_flush_interval.nil? end end end end