Sha256: cdfa10c78abe26a3b44f8fa3158112830b47df3bd8a72ded39445397ecaa3f50

Contents?: true

Size: 1.48 KB

Versions: 4

Compression:

Stored size: 1.48 KB

Contents

module Workers
  class Scheduler
    include Workers::Helpers

    def initialize(options = {})
      @logger = Workers::LogProxy.new(options[:logger])
      @pool = options[:pool] || Workers::Pool.new
      @schedule = SortedSet.new
      @mutex = Mutex.new
      @thread = Thread.new { start_loop }

      nil
    end

    def schedule(timer)
      @mutex.synchronize do
        @schedule << timer
      end

      wakeup

      nil
    end

    def unschedule(timer)
      @mutex.synchronize do
        @schedule.delete(timer)
      end

      nil
    end

    def wakeup
      @thread.wakeup

      nil
    end

    def dispose
      @mutex.synchronize do
        @pool.shutdown
        @pool.join
        @thread.kill
      end

      nil
    end

    def alive?
      @thread && @thread.alive?
    end

    private

    def start_loop
      while true
        delay = nil

        @mutex.synchronize do
          process_overdue
          delay = next_delay
        end

        delay ? sleep(delay) : sleep
      end

      nil
    end

    def process_overdue
      overdue = []

      while @schedule.first && @schedule.first.overdue?
        overdue << @schedule.first
        @schedule.delete(@schedule.first)
      end

      overdue.each do |timer|
        @pool.perform do
          timer.fire
        end

        timer.reset
        @schedule << timer if timer.repeat
      end

      nil
    end

    def next_delay
      @schedule.first ? @schedule.first.sec_remaining : nil
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
workers-0.6.1 lib/workers/scheduler.rb
workers-0.6.0 lib/workers/scheduler.rb
workers-0.5.0 lib/workers/scheduler.rb
workers-0.4.0 lib/workers/scheduler.rb