lib/util/schedule.rb in qpid_proton-0.22.0 vs lib/util/schedule.rb in qpid_proton-0.23.0

- old
+ new

@@ -25,55 +25,39 @@ # min(t1, t2) where nil is treated as "distant future" def earliest(t1, t2) before_eq(t1, t2) ? t1 : t2; end end # @private - # A sorted, thread-safe list of scheduled Proc. - # Note: calls to #process are always serialized, but calls to #add may be concurrent. + # A time-sorted list of objects. Thread unsafe. class Schedule include TimeCompare - Item = Struct.new(:time, :proc) + Entry = Struct.new(:time, :item) - def initialize() - @lock = Mutex.new - @items = [] - @closed = false - end + def initialize() @entries = []; end + def empty?() @entries.empty?; end + def next_tick() - @lock.synchronize { @items.first.time unless @items.empty? } + @entries.first.time unless @entries.empty? end - # @return true if the Schedule was previously empty - # @raise EOFError if schedule is closed - # @raise ThreadError if +non_block+ and operation would block - def add(time, non_block=false, &proc) - # non_block ignored for now, but we may implement a bounded schedule in future. - @lock.synchronize do - raise EOFError if @closed - if at = (0...@items.size).bsearch { |i| @items[i].time > time } - @items.insert(at, Item.new(time, proc)) - else - @items << Item.new(time, proc) - end - return @items.size == 1 - end + # @param at [Time] Insert item at time +at+ + # @param at [Numeric] Insert item at +Time.now \+ at+ + # @param at [0] Insert item at Time.at(0) + def insert(at, item) + time = case at + when 0 then Time.at(0) # Avoid call to Time.now for immediate tasks + when Numeric then Time.now + at + else at + end + index = time && ((0...@entries.size).bsearch { |i| @entries[i].time > time }) + @entries.insert(index || -1, Entry.new(time, item)) end - # @return true if the Schedule became empty as a result of this call - def process(now) - due = [] - empty = @lock.synchronize do - due << @items.shift while (!@items.empty? && before_eq(@items.first.time, now)) - @items.empty? - end - due.each { |i| i.proc.call() } - return empty && !due.empty? + # Return next item due at or before time, else nil + def pop(time) + @entries.shift.item if !@entries.empty? && before_eq(@entries.first.time, time) end - # #add raises EOFError after #close. - # #process can still be called to drain the schedule. - def close() - @lock.synchronize { @closed = true } - end + def clear() @entries.clear; end end end