lib/logstash/codecs/identity_map_codec.rb in logstash-codec-multiline-2.0.4 vs lib/logstash/codecs/identity_map_codec.rb in logstash-codec-multiline-2.0.5

- old
+ new

@@ -61,11 +61,11 @@ end def stop return if !running? @running = false - @thread.wakeup + @thread.wakeup if @thread.alive? end end # A composite class to hold both the codec and the eviction_timeout # instances of this Value Object are stored in the mapping hash @@ -135,32 +135,50 @@ end # end Constructional/builder methods # ============================================== # ============================================== + # IdentityMapCodec API + def evict(identity) + # maybe called more than once + if (compo = identity_map.delete(identity)) + compo.codec.auto_flush if compo.codec.respond_to?(:auto_flush) + end + end + # end IdentityMapCodec API + # ============================================== + + # ============================================== # Codec API def decode(data, identity = nil, &block) @decode_block = block if @decode_block != block stream_codec(identity).decode(data, &block) end + def accept(listener) + stream_codec(listener.path).accept(listener) + end + alias_method :<<, :decode def encode(event, identity = nil) stream_codec(identity).encode(event) end - # this method will not be called from - # the input or the pipeline unless - # we implement codec flush on shutdown - # problematic, because we may not have - # received all the multiline parts yet. - # but if we don't flush we will lose data def flush(&block) all_codecs.each do |codec| #let ruby do its default args thing - block.nil? ? codec.flush : codec.flush(&block) + if block_given? + codec.flush(&block) + else + if codec.respond_to?(:auto_flush) + codec.auto_flush + else + #try this, no guarantees + codec.flush + end + end end end def close() cleaner.stop @@ -189,14 +207,25 @@ cut_off = Time.now.to_i # delete_if is atomic # contents should not mutate during this call identity_map.delete_if do |identity, compo| if (flag = compo.timeout <= cut_off) - compo.codec.flush(&(@eviction_block || @decode_block)) + evict_flush(compo.codec) end flag end current_size_and_limit + end + + def evict_flush(codec) + if codec.respond_to?(:auto_flush) + codec.auto_flush + else + if (block = @eviction_block || @decode_block) + codec.flush(&block) + end + # all else - can't do anything + end end def current_size_and_limit [identity_count, max_limit] end