lib/logstash/codecs/auto_flush.rb in logstash-codec-multiline-2.0.6 vs lib/logstash/codecs/auto_flush.rb in logstash-codec-multiline-2.0.7

- old
+ new

@@ -5,24 +5,52 @@ def initialize(mc, interval) @mc, @interval = mc, interval @stopped = Concurrent::AtomicBoolean.new # false by default end + # def start + # # can't start if pipeline is stopping + # return self if stopped? + # if pending? + # @task.cancel + # create_task + # elsif finished? + # create_task + # # else the task is executing + # end + # self + # end + def start # can't start if pipeline is stopping return self if stopped? - if pending? - @task.reset - elsif finished? - @task = Concurrent::ScheduledTask.execute(@interval) do - @mc.auto_flush() - end - # else the task is executing + + if pending? && @task.cancel + create_task + return self end + # maybe we have a timing edge case + # where pending? was true but cancel failed + # because the task started running + if finished? + create_task + return self + end + # else the task is executing + # wait for task to complete + # flush could feasibly block on queue access + @task.value + create_task self end + def create_task + @task = Concurrent::ScheduledTask.execute(@interval) do + @mc.auto_flush() + end + end + def finished? return true if @task.nil? @task.fulfilled? end @@ -34,10 +62,14 @@ @stopped.value end def stop @stopped.make_true + cancel + end + + def cancel @task.cancel if pending? end end class AutoFlushUnset @@ -59,8 +91,12 @@ def finished? true end def stop + self + end + + def cancel self end end end end