lib/celluloid/actor.rb in celluloid-0.6.0 vs lib/celluloid/actor.rb in celluloid-0.6.1
- old
+ new
@@ -18,20 +18,18 @@
# Actors are Celluloid's concurrency primitive. They're implemented as
# normal Ruby objects wrapped in threads which communicate with asynchronous
# messages.
class Actor
extend Registry
- include Linking
attr_reader :proxy
attr_reader :links
attr_reader :mailbox
# Invoke a method on the given actor via its mailbox
def self.call(mailbox, meth, *args, &block)
- our_mailbox = Thread.current.mailbox
- call = SyncCall.new(our_mailbox, meth, args, block)
+ call = SyncCall.new(Thread.mailbox, meth, args, block)
begin
mailbox << call
rescue MailboxError
raise DeadActorError, "attempted to call a dead actor"
@@ -40,23 +38,22 @@
if Celluloid.actor?
# Yield to the actor scheduler, which resumes us when we get a response
response = Fiber.yield(call)
else
# Otherwise we're inside a normal thread, so block
- response = our_mailbox.receive do |msg|
- msg.is_a? Response and msg.call_id == call.id
+ response = Thread.mailbox.receive do |msg|
+ msg.respond_to?(:call_id) and msg.call_id == call.id
end
end
response.value
end
# Invoke a method asynchronously on an actor via its mailbox
def self.async(mailbox, meth, *args, &block)
- our_mailbox = Thread.current.mailbox
begin
- mailbox << AsyncCall.new(our_mailbox, meth, args, block)
+ mailbox << AsyncCall.new(Thread.mailbox, meth, args, block)
rescue MailboxError
# Silently swallow asynchronous calls to dead actors. There's no way
# to reliably generate DeadActorErrors for async calls, so users of
# async calls should find other ways to deal with actors dying
# during an async call (i.e. linking/supervisors)
@@ -74,18 +71,17 @@
end
@links = Links.new
@signals = Signals.new
@receivers = Receivers.new
- @proxy = ActorProxy.new(@mailbox)
+ @proxy = ActorProxy.new(@mailbox, self.class.to_s)
@running = true
@pending_calls = {}
@thread = Pool.get do
- Thread.current[:actor] = self
- Thread.current[:actor_proxy] = @proxy
- Thread.current[:mailbox] = @mailbox
+ Thread.current[:actor] = self
+ Thread.current[:mailbox] = @mailbox
run
end
end
@@ -119,12 +115,11 @@
def run
while @running
begin
message = @mailbox.receive
rescue ExitEvent => exit_event
- fiber = Celluloid.fiber { handle_exit_event exit_event; nil }
- run_fiber fiber
+ Celluloid::Fiber.new { handle_exit_event exit_event; nil }.resume
retry
end
handle_message message
end
@@ -138,32 +133,25 @@
handle_crash(ex)
ensure
Pool.put @thread
end
- # Run a method, handling when its Fiber is suspended
- def run_fiber(fiber, value = nil)
- result = fiber.resume value
- if result.is_a? Celluloid::Call
- @pending_calls[result.id] = fiber if fiber.alive?
- elsif result
- warning = "non-call returned from fiber: #{result.class}"
- Celluloid.logger.debug warning if Celluloid.logger
- end
- nil
+ # Register a fiber waiting for the response to a Celluloid::Call
+ def register_fiber(call, fiber)
+ raise ArgumentError, "attempted to register a dead fiber" unless fiber.alive?
+ @pending_calls[call.id] = fiber
end
# Handle an incoming message
def handle_message(message)
case message
when Call
- fiber = Celluloid.fiber { message.dispatch(@subject); nil }
- run_fiber fiber
+ Celluloid::Fiber.new { message.dispatch(@subject); nil }.resume
when Response
fiber = @pending_calls.delete(message.call_id)
if fiber
- run_fiber fiber, message
+ fiber.resume message
else
warning = "spurious response to call #{message.call_id}"
Celluloid.logger.debug if Celluloid.logger
end
else