lib/util/schedule.rb in qpid_proton-0.22.0 vs lib/util/schedule.rb in qpid_proton-0.23.0
- old
+ new
@@ -25,55 +25,39 @@
# min(t1, t2) where nil is treated as "distant future"
def earliest(t1, t2) before_eq(t1, t2) ? t1 : t2; end
end
# @private
- # A sorted, thread-safe list of scheduled Proc.
- # Note: calls to #process are always serialized, but calls to #add may be concurrent.
+ # A time-sorted list of objects. Thread unsafe.
class Schedule
include TimeCompare
- Item = Struct.new(:time, :proc)
+ Entry = Struct.new(:time, :item)
- def initialize()
- @lock = Mutex.new
- @items = []
- @closed = false
- end
+ def initialize() @entries = []; end
+ def empty?() @entries.empty?; end
+
def next_tick()
- @lock.synchronize { @items.first.time unless @items.empty? }
+ @entries.first.time unless @entries.empty?
end
- # @return true if the Schedule was previously empty
- # @raise EOFError if schedule is closed
- # @raise ThreadError if +non_block+ and operation would block
- def add(time, non_block=false, &proc)
- # non_block ignored for now, but we may implement a bounded schedule in future.
- @lock.synchronize do
- raise EOFError if @closed
- if at = (0...@items.size).bsearch { |i| @items[i].time > time }
- @items.insert(at, Item.new(time, proc))
- else
- @items << Item.new(time, proc)
- end
- return @items.size == 1
- end
+ # @param at [Time] Insert item at time +at+
+ # @param at [Numeric] Insert item at +Time.now \+ at+
+ # @param at [0] Insert item at Time.at(0)
+ def insert(at, item)
+ time = case at
+ when 0 then Time.at(0) # Avoid call to Time.now for immediate tasks
+ when Numeric then Time.now + at
+ else at
+ end
+ index = time && ((0...@entries.size).bsearch { |i| @entries[i].time > time })
+ @entries.insert(index || -1, Entry.new(time, item))
end
- # @return true if the Schedule became empty as a result of this call
- def process(now)
- due = []
- empty = @lock.synchronize do
- due << @items.shift while (!@items.empty? && before_eq(@items.first.time, now))
- @items.empty?
- end
- due.each { |i| i.proc.call() }
- return empty && !due.empty?
+ # Return next item due at or before time, else nil
+ def pop(time)
+ @entries.shift.item if !@entries.empty? && before_eq(@entries.first.time, time)
end
- # #add raises EOFError after #close.
- # #process can still be called to drain the schedule.
- def close()
- @lock.synchronize { @closed = true }
- end
+ def clear() @entries.clear; end
end
end