lib/em-synchrony/thread.rb in em-synchrony-1.0.0 vs lib/em-synchrony/thread.rb in em-synchrony-1.0.1

- old
+ new

@@ -2,23 +2,124 @@ module Synchrony module Thread # Fiber-aware drop-in replacements for thread objects class Mutex - def synchronize( &blk ) - blk.call + def initialize + @waiters = [] + @slept = {} end + + def lock + current = Fiber.current + raise FiberError if @waiters.include?(current) + @waiters << current + Fiber.yield unless @waiters.first == current + true + end + + def locked? + !@waiters.empty? + end + + def _wakeup(fiber) + fiber.resume if @slept.delete(fiber) + end + + def sleep(timeout = nil) + unlock + beg = Time.now + current = Fiber.current + @slept[current] = true + if timeout + timer = EM.add_timer(timeout) do + _wakeup(current) + end + Fiber.yield + EM.cancel_timer timer # if we resumes not via timer + else + Fiber.yield + end + @slept.delete current + yield if block_given? + lock + Time.now - beg + end + + def try_lock + lock unless locked? + end + + def unlock + raise FiberError unless @waiters.first == Fiber.current + @waiters.shift + unless @waiters.empty? + EM.next_tick{ @waiters.first.resume } + end + self + end + + def synchronize + lock + yield + ensure + unlock + end + end class ConditionVariable - def wait( mutex ) - @deferrable = EventMachine::DefaultDeferrable.new - EventMachine::Synchrony.sync @deferrable - @deferrable = nil + # + # Creates a new ConditionVariable + # + def initialize + @waiters = [] end + # + # Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup. + # + # If +timeout+ is given, this method returns after +timeout+ seconds passed, + # even if no other thread doesn't signal. + # + def wait(mutex, timeout=nil) + current = Fiber.current + pair = [mutex, current] + @waiters << pair + mutex.sleep timeout do + @waiters.delete pair + end + self + end + + def _wakeup(mutex, fiber) + if alive = fiber.alive? + EM.next_tick { + mutex._wakeup(fiber) + } + end + alive + end + + # + # Wakes up the first thread in line waiting for this lock. + # def signal - @deferrable and @deferrable.succeed + while (pair = @waiters.shift) + break if _wakeup(*pair) + end + self + end + + # + # Wakes up all threads waiting for this lock. + # + def broadcast + @waiters.each do |mutex, fiber| + _wakeup(mutex, fiber) + end + @waiters.clear + self end end end end