Sha256: 6dc095d6f1e368b366637539e84da17738d32ad8d5137972a5ce85aa58ac5412

Contents?: true

Size: 1.8 KB

Versions: 1

Compression:

Stored size: 1.8 KB

Contents

require 'celluloid'

module Nestene
  module Actor
    class DelayedScheduler

      include Celluloid
      include Celluloid::Notifications

      def initialize
        subscribe('state_update', :update_schedule)
        @schedule = []
        @timer = nil
      end


      def update_schedule topic, auton_id, state

        @schedule.delete_if{|e| e.first == auton_id}

        state.queue.delayed.each do |d|
          @schedule << [auton_id, d]
        end

        @schedule.sort_by!{|e| e[1].execute_at}

        async.schedule_methods

      end


      def schedule_methods

        if @timer
          @timer.cancel
        end

        now = Time.now

        loop do
          (auton_id, delayed_method) = @schedule.shift

          if auton_id
            execute_in = delayed_method.execute_at - now
            if execute_in <= 0
              Celluloid::Actor["storage:%s" % auton_id].update do |state|
                delayed = state.queue.delayed.shift
                execute_in = delayed.execute_at - now
                if execute_in <= 0
                  method = ScheduledMethod.new(delayed)
                  state.queue.to_execute << method
                end
                if delayed.every
                  delayed.execute_at += delayed.every
                  state.queue.add_delayed(delayed)
                end
              end
            else
              @schedule.unshift [auton_id, delayed_method]
              break
            end
          else
            break
          end
        end

        unless @schedule.empty?
          if now >= @schedule.first[1].execute_at
            async.schedule_methods
          else
            to_wait = @schedule.first[1].execute_at - now
            @timer = after(to_wait){async.schedule_methods}
          end
        end

      end
    end

  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
nestene-0.1.8 lib/nestene/actor/delayed_scheduler.rb