Sha256: 80d560cfe39ce1fc5e7576e06ddc8df72f82948770f8c8a0b66a2dc146049c8e
Contents?: true
Size: 1.78 KB
Versions: 2
Compression:
Stored size: 1.78 KB
Contents
module Celluloid class ConditionError < StandardError; end # ConditionVariable-like signaling between tasks and actors class Condition class Waiter def initialize(condition, task, mailbox) @condition = condition @task = task @mailbox = mailbox end attr_reader :condition, :task def <<(message) @mailbox << message end def wait message = @mailbox.receive do |msg| msg.is_a?(SignalConditionRequest) && msg.task == Thread.current end message.value end end attr_reader :owner def initialize @mutex = Mutex.new @tasks = [] end # Wait for the given signal and return the associated value def wait raise ConditionError, "cannot wait for signals while exclusive" if Celluloid.exclusive? if Thread.current[:celluloid_actor] task = Task.current else task = Thread.current end waiter = Waiter.new(self, task, Celluloid.mailbox) @mutex.synchronize do @tasks << waiter end result = Celluloid.suspend :condwait, waiter raise result if result.is_a? ConditionError result end # Send a signal to the first task waiting on this condition def signal(value = nil) @mutex.synchronize do if waiter = @tasks.shift waiter << SignalConditionRequest.new(waiter.task, value) else Logger.debug("Celluloid::Condition signaled spuriously") end end end # Broadcast a value to all waiting tasks def broadcast(value = nil) @mutex.synchronize do @tasks.each { |waiter| waiter << SignalConditionRequest.new(waiter.task, value) } @tasks.clear end end alias_method :inspect, :to_s end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
celluloid-0.14.1 | lib/celluloid/condition.rb |
celluloid-0.14.1.pre | lib/celluloid/condition.rb |