Sha256: 6dc095d6f1e368b366637539e84da17738d32ad8d5137972a5ce85aa58ac5412
Contents?: true
Size: 1.8 KB
Versions: 1
Compression:
Stored size: 1.8 KB
Contents
require 'celluloid' module Nestene module Actor class DelayedScheduler include Celluloid include Celluloid::Notifications def initialize subscribe('state_update', :update_schedule) @schedule = [] @timer = nil end def update_schedule topic, auton_id, state @schedule.delete_if{|e| e.first == auton_id} state.queue.delayed.each do |d| @schedule << [auton_id, d] end @schedule.sort_by!{|e| e[1].execute_at} async.schedule_methods end def schedule_methods if @timer @timer.cancel end now = Time.now loop do (auton_id, delayed_method) = @schedule.shift if auton_id execute_in = delayed_method.execute_at - now if execute_in <= 0 Celluloid::Actor["storage:%s" % auton_id].update do |state| delayed = state.queue.delayed.shift execute_in = delayed.execute_at - now if execute_in <= 0 method = ScheduledMethod.new(delayed) state.queue.to_execute << method end if delayed.every delayed.execute_at += delayed.every state.queue.add_delayed(delayed) end end else @schedule.unshift [auton_id, delayed_method] break end else break end end unless @schedule.empty? if now >= @schedule.first[1].execute_at async.schedule_methods else to_wait = @schedule.first[1].execute_at - now @timer = after(to_wait){async.schedule_methods} end end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
nestene-0.1.8 | lib/nestene/actor/delayed_scheduler.rb |