Sha256: e5f1f73a7e9c16025e579ba024ba8d47ba972cb28a3ea55f929001d587abe880

Contents?: true

Size: 1.51 KB

Versions: 5

Compression:

Stored size: 1.51 KB

Contents

module Workers
  class Scheduler
    include Workers::Helpers

    def initialize(options = {})
      @logger = Workers::LogProxy.new(options[:logger])
      @pool = options[:pool] || Workers.pool
      @schedule = SortedSet.new
      @mutex = Mutex.new
      @thread = Thread.new { start_loop }
    end

    def schedule(timer)
      @mutex.synchronize do
        @schedule << timer
      end

      wakeup

      return nil
    end

    def unschedule(timer)
      @mutex.synchronize do
        @schedule.delete(timer)
      end

      return true
    end

    def wakeup
      @thread.wakeup

      return nil
    end

    def dispose
      @mutex.synchronize do
        @pool.shutdown
        @pool.join
        @thread.kill
      end

      return nil
    end

    def alive?
      return @thread && @thread.alive?
    end

    private

    def start_loop
      while true
        delay = nil

        @mutex.synchronize do
          process_overdue
          delay = next_delay
        end

        delay ? sleep(delay) : sleep
      end

      return nil
    end

    def process_overdue
      overdue = []

      while @schedule.first && @schedule.first.overdue?
        overdue << @schedule.first
        @schedule.delete(@schedule.first)
      end

      overdue.each do |timer|
        @pool.perform do
          timer.fire
        end

        timer.reset
        @schedule << timer if timer.repeat
      end

      return nil
    end

    def next_delay
      @schedule.first ? @schedule.first.sec_remaining : nil
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
workers-0.0.9 lib/workers/scheduler.rb
workers-0.0.8 lib/workers/scheduler.rb
workers-0.0.7 lib/workers/scheduler.rb
workers-0.0.6 lib/workers/scheduler.rb
workers-0.0.5 lib/workers/scheduler.rb