lib/logstash/codecs/identity_map_codec.rb in logstash-codec-multiline-2.0.7 vs lib/logstash/codecs/identity_map_codec.rb in logstash-codec-multiline-2.0.8
- old
+ new
@@ -1,8 +1,9 @@
# encoding: utf-8
require "logstash/namespace"
require "thread_safe"
+require "concurrent"
# This class is a Codec duck type
# Using Composition, it maps from a stream identity to
# a cloned codec instance via the same API as a Codec
# it implements the codec public API
@@ -35,45 +36,55 @@
:current_size => current_size, :upper_limit => limit)
raise IdentityMapUpperLimitException.new
end
end
- class MapCleaner
- def initialize(imc, interval)
- @imc, @interval = imc, interval
- @running = false
+ class PeriodicRunner
+ def initialize(listener, interval, method_symbol)
+ @listener, @interval = listener, interval
+ @method_symbol = method_symbol
+ @running = Concurrent::AtomicBoolean.new(false)
end
def start
return self if running?
- @running = true
- @thread = Thread.new(@imc) do |imc|
- loop do
+ @running.make_true
+ @thread = Thread.new() do
+ while running? do
sleep @interval
- break if !@running
- imc.map_cleanup
+ break if !running?
+ @listener.send(@method_symbol)
end
end
self
end
def running?
- @running
+ @running.value
end
def stop
return if !running?
- @running = false
- @thread.wakeup if @thread.alive?
+ @running.make_false
+ if @thread.alive?
+ @thread.wakeup
+ @thread.join
+ end
+ @listener = nil
end
end
- # A composite class to hold both the codec and the eviction_timeout
+ class NoopRunner
+ attr_reader :start, :stop
+ def running?() false; end
+ end
+
+ # A composite class to hold both the codec, the eviction_timeout and a last_used timestamp
# instances of this Value Object are stored in the mapping hash
class CodecValue
attr_reader :codec
- attr_accessor :timeout
+ attr_accessor :eviction_timeout, :auto_flush_timeout
def initialize(codec)
@codec = codec
end
end
@@ -90,19 +101,26 @@
# time that the cleaner thread sleeps for
# before it tries to clean out stale mappings
CLEANER_INTERVAL = 60 * 5 # 5 minutes
attr_reader :identity_map
- attr_accessor :base_codec, :cleaner
+ attr_accessor :base_codec, :cleaner, :auto_flusher
def initialize(codec)
@base_codec = codec
@base_codecs = [codec]
@identity_map = ThreadSafe::Hash.new &method(:codec_builder)
@max_identities = MAX_IDENTITIES
@evict_timeout = EVICT_TIMEOUT
- @cleaner = MapCleaner.new(self, CLEANER_INTERVAL)
+ cleaner_interval(CLEANER_INTERVAL)
+ if codec.respond_to?(:use_mapper_auto_flush) &&
+ (@auto_flush_interval = codec.use_mapper_auto_flush)
+ @auto_flusher = PeriodicRunner.new(self, 0.5, :auto_flush_mapped)
+ else
+ @auto_flusher = NoopRunner.new
+ end
+
@decode_block = lambda {|*| true }
@eviction_block = nil
end
# ==============================================
@@ -121,20 +139,21 @@
self
end
# used to add a non-default cleaner interval
def cleaner_interval(interval)
- @cleaner.stop
- @cleaner = MapCleaner.new(self, interval.to_i)
+ @cleaner.stop if @cleaner
+ @cleaner = PeriodicRunner.new(self, interval.to_i, :map_cleanup)
self
end
# used to add a non-default eviction block
def eviction_block(block)
@eviction_block = block
self
end
+
# end Constructional/builder methods
# ==============================================
# ==============================================
# IdentityMapCodec API
@@ -180,15 +199,38 @@
end
end
def close()
cleaner.stop
+ auto_flusher.stop
all_codecs.each(&:close)
end
# end Codec API
# ==============================================
+ def auto_flush_mapped
+ if !identity_count.zero?
+ nowf = Time.now.to_f
+ identity_map.each do |identity, compo|
+ next if compo.auto_flush_timeout.zero?
+ next unless nowf > compo.auto_flush_timeout
+ compo.codec.auto_flush
+ # at eof (tail and read) no more lines for a while or ever
+ # so reset compo.auto_flush_timeout
+ compo.auto_flush_timeout = 0
+ end
+ end
+ end
+
+ def flush_mapped(listener)
+ listener_has_path = listener.respond_to?(:path)
+ identity_map.each do |identity, compo|
+ listener.path = identity if listener_has_path
+ compo.codec.auto_flush(listener)
+ end
+ end
+
def all_codecs
no_streams? ? @base_codecs : identity_map.values.map(&:codec)
end
def max_limit
@@ -202,18 +244,20 @@
# support cleaning of stale stream/codecs
# a stream is considered stale if it has not
# been accessed in the last @evict_timeout
# period (default 1 hour)
def map_cleanup
- cut_off = Time.now.to_i
- # delete_if is atomic
- # contents should not mutate during this call
- identity_map.delete_if do |identity, compo|
- if (flag = compo.timeout <= cut_off)
- evict_flush(compo.codec)
+ if !identity_count.zero?
+ nowi = Time.now.to_i
+ # delete_if is atomic
+ # contents should not mutate during this call
+ identity_map.delete_if do |identity, compo|
+ if (flag = compo.eviction_timeout <= nowi)
+ evict_flush(compo.codec)
+ end
+ flag
end
- flag
end
current_size_and_limit
end
def evict_flush(codec)
@@ -239,11 +283,11 @@
def codec_without_usage_update(identity)
find_codec_value(identity).codec
end
def eviction_timestamp_for(identity)
- find_codec_value(identity).timeout
+ find_codec_value(identity).eviction_timeout
end
private
def stream_codec(identity)
@@ -259,29 +303,41 @@
def record_codec_usage(identity)
check_map_limits
# only start the cleaner if streams are in use
# continuous calls to start are OK
cleaner.start
+ auto_flusher.start
compo = find_codec_value(identity)
- compo.timeout = eviction_timestamp
+ now = Time.now
+ compo.eviction_timeout = eviction_timestamp(now)
+ compo.auto_flush_timeout = auto_flush_timestamp(now)
compo.codec
end
- def eviction_timestamp
- Time.now.to_i + @evict_timeout
+ def auto_flush_timestamp(now = Time.now)
+ now.to_f + @auto_flush_interval.to_f
end
+ def eviction_timestamp(now = Time.now)
+ now.to_i + @evict_timeout
+ end
+
def check_map_limits
UpperLimitReached.visit(self)
EightyPercentWarning.visit(self)
end
def codec_builder(hash, k)
codec = hash.empty? ? @base_codec : @base_codec.clone
+ codec.use_mapper_auto_flush if using_mapped_auto_flush?
compo = CodecValue.new(codec)
hash.store(k, compo)
end
def no_streams?
identity_map.empty?
+ end
+
+ def using_mapped_auto_flush?
+ !@auto_flush_interval.nil?
end
end end end