require 'flydata-core/logger' module Flydata module Helper class Scheduler include FlydataCore::Logger RUN_INTERVAL = 1.0 #second RUN_GC_INTERVAL = 60.0 * 5 #seconds DEBUG_DUMP_INTERVAL = 60.0 #seconds def initialize(helper_conf, server) @stop_flag = ServerEngine::BlockingFlag.new @server = server self.logger = server.logger reload(helper_conf) end attr_reader :server, :helper_conf def start log_info("start") @thread = Thread.new(&method(:run)) self end def run until @stop_flag.set? run_once end rescue => e log_error("unexpected error during running. error:#{e}\n" + e.backtrace.join("\n")) retry unless @stop_flag.wait_for_set(5.0) ensure log_debug("finish running") end def wake log_debug("wake") @stop_flag.reset! end def stop log_info("stop") @stop_flag.set! end def reload(helper_conf) log_info("reload") @helper_conf = helper_conf update_scheduled_actions end def shutdown log_info("shutdown") stop join self end def join log_debug("join") stop if @thread @thread.join @thread = nil end end # For debug def running? !!(@thread and @thread.alive?) end def scheduled_actions @scheduled_actions.dup end private def update_scheduled_actions @scheduled_actions = helper_conf.scheduled_actions @scheduled_actions.each {|k, v| v[:name] = k v[:last_request_time] = -1 } end def run_once start_time = Time.now.to_f # Check scheduled actions @scheduled_actions.each do |name, action| if (start_time - action[:last_request_time]) >= action[:check_interval] ret = @server.action_ownership_channel.request_action(name) action[:last_request_time] = Time.now.to_f #@server.action_ownership_channel.dump end end dump_queue_if_needed run_gc_if_needed wait_until_next_turn(start_time) end def dump_queue_if_needed now = Time.now @last_dump_time ||= now if (@last_dump_time + DEBUG_DUMP_INTERVAL) < now if File.exists?(FLYDATA_DEBUG_FILE) log_info(@server.action_ownership_channel.dump) rescue nil end @last_dump_time = now end end def run_gc_if_needed now = Time.now @last_gc_time ||= now if (@last_gc_time + RUN_GC_INTERVAL) < now GC.start @last_gc_time = now end end def wait_until_next_turn(start_time) next_time = start_time + RUN_INTERVAL sleep_time = next_time - Time.now.to_f @stop_flag.wait_for_set(sleep_time) if sleep_time > 0 end def custom_log_items super.merge(prefix: '[scheduler]') end end end end