Sha256: e2c176ce55a11133b96f050277da089eb3073974c69c6051e69cdff908c2b119

Contents?: true

Size: 1.37 KB

Versions: 1

Compression:

Stored size: 1.37 KB

Contents

module Workers
  class Scheduler
    def initialize(options = {})
      @pool = options[:pool] || Workers::Pool.new
      @schedule = SortedSet.new
      @mutex = Mutex.new
      @thread = Thread.new { start_loop }
    end

    def schedule(timer)
      @mutex.synchronize do
        @schedule << timer
      end

      wakeup

      return nil
    end

    def unschedule(timer)
      @mutex.synchronize do
        @schedule.delete(timer)
      end

      return true
    end

    def wakeup
      @thread.wakeup

      return nil
    end

    def dispose
      @mutex.synchronize do
        @pool.shutdown
        @pool.join
        @thread.kill
      end

      return nil
    end

    private

    def start_loop
      while true
        delay = nil

        @mutex.synchronize do
          process_overdue
          delay = next_delay
        end

        delay ? sleep(delay) : sleep
      end

      return nil
    end

    def process_overdue
      overdue = []

      while @schedule.first && @schedule.first.overdue?
        overdue << @schedule.first
        @schedule.delete(@schedule.first)
      end

      overdue.each do |timer|
        @pool.perform do
          timer.fire
        end

        timer.reset
        @schedule << timer if timer.repeat
      end

      return nil
    end

    def next_delay
      @schedule.first ? @schedule.first.sec_remaining : nil
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
workers-0.0.3 lib/workers/scheduler.rb