module Fluent class IdleEventDetector def initialize(initial_interval, continuous_interval, check_interval = 5, idle_timeout = 3600) @mutex = Mutex.new @initial_interval = initial_interval @continuous_interval = continuous_interval @check_interval = check_interval @idle_timeout = idle_timeout end def start(&block) raise "block must be given" unless block_given? @callback = block stop if @thread @last_event_at = @timestamp = Time.current @is_absent = false @thread = Thread.new do loop do check_state sleep @check_interval end end end def stop @thread.exit if @thread @thread = nil @is_absent = false @last_event_at = @timestamp = nil @callback = nil end def notify update_last_event_time end private def update_last_event_time @mutex.synchronize do @last_event_at = @timestamp = Time.current if @is_absent @callback.call(:event_arrived_finally, @last_event_at) if @callback @is_absent = false end end end def check_state @mutex.synchronize do if @is_absent if Time.current - @last_event_at >= @idle_timeout @callback.call(:event_idle_timeout, @last_event_at) if @callback @timestamp = Time.current elsif Time.current - @timestamp >= @continuous_interval @callback.call(:event_still_not_coming, @last_event_at) if @callback @timestamp = Time.current end else if Time.current - @timestamp >= @initial_interval @callback.call(:event_not_coming, @last_event_at) if @callback @is_absent = true @timestamp = Time.current end end end end end end