require 'celluloid' module Nestene module Actor class DelayedScheduler include Celluloid include Celluloid::Notifications def initialize subscribe('state_update', :update_schedule) @schedule = [] @timer = nil end def schedule_struct @schedule.to_structure end def update_schedule topic, auton_id, state @schedule.delete_if{|e| e.first == auton_id} if state state.queue.delayed.each do |d| @schedule << [auton_id, d] end @schedule.sort_by!{|e| e[1].execute_at} async.schedule_methods end end def schedule_methods if @timer @timer.cancel end now = Time.now loop do (auton_id, delayed_method) = @schedule.shift if auton_id execute_in = delayed_method.execute_at - now if execute_in <= 0 Celluloid::Actor["storage:%s" % auton_id].update do |state| delayed = state.queue.delayed.shift execute_in = delayed.execute_at - now if execute_in <= 0 method = ScheduledMethod.new(delayed) state.queue.to_execute << method end if delayed.every delayed.execute_at = now + delayed.every state.queue.add_delayed(delayed) end end else @schedule.unshift [auton_id, delayed_method] break end else break end end unless @schedule.empty? if now >= @schedule.first[1].execute_at async.schedule_methods else to_wait = @schedule.first[1].execute_at - now @timer = after(to_wait){async.schedule_methods} end end end end end end