Sha256: 39bfd841c757fd26efd00d59dcf15ebf0353269ba66a9f8d78e79c1c7affbcfd
Contents?: true
Size: 1.58 KB
Versions: 1
Compression:
Stored size: 1.58 KB
Contents
# encoding: utf-8 require "concurrent" module LogStash module Codecs class AutoFlush def initialize(mc, interval) @mc, @interval = mc, interval @stopped = Concurrent::AtomicBoolean.new # false by default end # def start # # can't start if pipeline is stopping # return self if stopped? # if pending? # @task.cancel # create_task # elsif finished? # create_task # # else the task is executing # end # self # end def start # can't start if pipeline is stopping return self if stopped? if pending? && @task.cancel create_task return self end # maybe we have a timing edge case # where pending? was true but cancel failed # because the task started running if finished? create_task return self end # else the task is executing # wait for task to complete # flush could feasibly block on queue access @task.value create_task self end def create_task @task = Concurrent::ScheduledTask.execute(@interval) do @mc.auto_flush() end end def finished? return true if @task.nil? @task.fulfilled? end def pending? @task && @task.pending? end def stopped? @stopped.value end def stop @stopped.make_true cancel end def cancel @task.cancel if pending? end end class AutoFlushUnset def initialize(mc, interval) end def pending? false end def stopped? true end def start self end def finished? true end def stop self end def cancel self end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
logstash-codec-multiline-2.0.7 | lib/logstash/codecs/auto_flush.rb |