lib/logstash/codecs/identity_map_codec.rb in logstash-codec-multiline-2.0.4 vs lib/logstash/codecs/identity_map_codec.rb in logstash-codec-multiline-2.0.5
- old
+ new
@@ -61,11 +61,11 @@
end
def stop
return if !running?
@running = false
- @thread.wakeup
+ @thread.wakeup if @thread.alive?
end
end
# A composite class to hold both the codec and the eviction_timeout
# instances of this Value Object are stored in the mapping hash
@@ -135,32 +135,50 @@
end
# end Constructional/builder methods
# ==============================================
# ==============================================
+ # IdentityMapCodec API
+ def evict(identity)
+ # maybe called more than once
+ if (compo = identity_map.delete(identity))
+ compo.codec.auto_flush if compo.codec.respond_to?(:auto_flush)
+ end
+ end
+ # end IdentityMapCodec API
+ # ==============================================
+
+ # ==============================================
# Codec API
def decode(data, identity = nil, &block)
@decode_block = block if @decode_block != block
stream_codec(identity).decode(data, &block)
end
+ def accept(listener)
+ stream_codec(listener.path).accept(listener)
+ end
+
alias_method :<<, :decode
def encode(event, identity = nil)
stream_codec(identity).encode(event)
end
- # this method will not be called from
- # the input or the pipeline unless
- # we implement codec flush on shutdown
- # problematic, because we may not have
- # received all the multiline parts yet.
- # but if we don't flush we will lose data
def flush(&block)
all_codecs.each do |codec|
#let ruby do its default args thing
- block.nil? ? codec.flush : codec.flush(&block)
+ if block_given?
+ codec.flush(&block)
+ else
+ if codec.respond_to?(:auto_flush)
+ codec.auto_flush
+ else
+ #try this, no guarantees
+ codec.flush
+ end
+ end
end
end
def close()
cleaner.stop
@@ -189,14 +207,25 @@
cut_off = Time.now.to_i
# delete_if is atomic
# contents should not mutate during this call
identity_map.delete_if do |identity, compo|
if (flag = compo.timeout <= cut_off)
- compo.codec.flush(&(@eviction_block || @decode_block))
+ evict_flush(compo.codec)
end
flag
end
current_size_and_limit
+ end
+
+ def evict_flush(codec)
+ if codec.respond_to?(:auto_flush)
+ codec.auto_flush
+ else
+ if (block = @eviction_block || @decode_block)
+ codec.flush(&block)
+ end
+ # all else - can't do anything
+ end
end
def current_size_and_limit
[identity_count, max_limit]
end