Sha256: e2c176ce55a11133b96f050277da089eb3073974c69c6051e69cdff908c2b119
Contents?: true
Size: 1.37 KB
Versions: 1
Compression:
Stored size: 1.37 KB
Contents
module Workers class Scheduler def initialize(options = {}) @pool = options[:pool] || Workers::Pool.new @schedule = SortedSet.new @mutex = Mutex.new @thread = Thread.new { start_loop } end def schedule(timer) @mutex.synchronize do @schedule << timer end wakeup return nil end def unschedule(timer) @mutex.synchronize do @schedule.delete(timer) end return true end def wakeup @thread.wakeup return nil end def dispose @mutex.synchronize do @pool.shutdown @pool.join @thread.kill end return nil end private def start_loop while true delay = nil @mutex.synchronize do process_overdue delay = next_delay end delay ? sleep(delay) : sleep end return nil end def process_overdue overdue = [] while @schedule.first && @schedule.first.overdue? overdue << @schedule.first @schedule.delete(@schedule.first) end overdue.each do |timer| @pool.perform do timer.fire end timer.reset @schedule << timer if timer.repeat end return nil end def next_delay @schedule.first ? @schedule.first.sec_remaining : nil end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
workers-0.0.3 | lib/workers/scheduler.rb |