lib/logstash/codecs/auto_flush.rb in logstash-codec-multiline-2.0.6 vs lib/logstash/codecs/auto_flush.rb in logstash-codec-multiline-2.0.7
- old
+ new
@@ -5,24 +5,52 @@
def initialize(mc, interval)
@mc, @interval = mc, interval
@stopped = Concurrent::AtomicBoolean.new # false by default
end
+ # def start
+ # # can't start if pipeline is stopping
+ # return self if stopped?
+ # if pending?
+ # @task.cancel
+ # create_task
+ # elsif finished?
+ # create_task
+ # # else the task is executing
+ # end
+ # self
+ # end
+
def start
# can't start if pipeline is stopping
return self if stopped?
- if pending?
- @task.reset
- elsif finished?
- @task = Concurrent::ScheduledTask.execute(@interval) do
- @mc.auto_flush()
- end
- # else the task is executing
+
+ if pending? && @task.cancel
+ create_task
+ return self
end
+ # maybe we have a timing edge case
+ # where pending? was true but cancel failed
+ # because the task started running
+ if finished?
+ create_task
+ return self
+ end
+ # else the task is executing
+ # wait for task to complete
+ # flush could feasibly block on queue access
+ @task.value
+ create_task
self
end
+ def create_task
+ @task = Concurrent::ScheduledTask.execute(@interval) do
+ @mc.auto_flush()
+ end
+ end
+
def finished?
return true if @task.nil?
@task.fulfilled?
end
@@ -34,10 +62,14 @@
@stopped.value
end
def stop
@stopped.make_true
+ cancel
+ end
+
+ def cancel
@task.cancel if pending?
end
end
class AutoFlushUnset
@@ -59,8 +91,12 @@
def finished?
true
end
def stop
+ self
+ end
+
+ def cancel
self
end
end end end