lib/celluloid/actor.rb in celluloid-0.5.0 vs lib/celluloid/actor.rb in celluloid-0.6.0
- old
+ new
@@ -1,20 +1,5 @@
-require 'thread'
-
-begin
- require 'fiber'
-rescue LoadError => ex
- if defined? JRUBY_VERSION
- raise LoadError, "Celluloid requires JRuby 1.9 mode. Please pass the --1.9 flag or set JRUBY_OPTS=--1.9"
- elsif defined? Rubinius
- # If we're on Rubinius, we can still work in 1.8 mode
- Fiber = Rubinius::Fiber
- else
- raise ex
- end
-end
-
module Celluloid
# Don't do Actor-like things outside Actor scope
class NotActorError < StandardError; end
# Trying to do something to a dead actor
@@ -39,51 +24,71 @@
attr_reader :proxy
attr_reader :links
attr_reader :mailbox
+ # Invoke a method on the given actor via its mailbox
+ def self.call(mailbox, meth, *args, &block)
+ our_mailbox = Thread.current.mailbox
+ call = SyncCall.new(our_mailbox, meth, args, block)
- # Wrap the given subject object with an Actor
- def initialize(subject)
- @subject = subject
- @mailbox = initialize_mailbox
- @links = Links.new
- @signals = Signals.new
- @proxy = ActorProxy.new(self, @mailbox)
- @running = true
+ begin
+ mailbox << call
+ rescue MailboxError
+ raise DeadActorError, "attempted to call a dead actor"
+ end
- @thread = Pool.get
- @thread[:queue] << proc do
- initialize_thread_locals
- run
+ if Celluloid.actor?
+ # Yield to the actor scheduler, which resumes us when we get a response
+ response = Fiber.yield(call)
+ else
+ # Otherwise we're inside a normal thread, so block
+ response = our_mailbox.receive do |msg|
+ msg.is_a? Response and msg.call_id == call.id
+ end
end
- end
- # Create the mailbox for this actor
- #
- # This implemententation is intended to be overridden in special-case
- # subclasses of Celluloid::Actor which use a custom mailbox
- def initialize_mailbox
- Mailbox.new
+ response.value
end
- # Configure thread locals for the running thread
- def initialize_thread_locals
- Thread.current[:actor] = self
- Thread.current[:actor_proxy] = @proxy
- Thread.current[:mailbox] = @mailbox
+ # Invoke a method asynchronously on an actor via its mailbox
+ def self.async(mailbox, meth, *args, &block)
+ our_mailbox = Thread.current.mailbox
+ begin
+ mailbox << AsyncCall.new(our_mailbox, meth, args, block)
+ rescue MailboxError
+ # Silently swallow asynchronous calls to dead actors. There's no way
+ # to reliably generate DeadActorErrors for async calls, so users of
+ # async calls should find other ways to deal with actors dying
+ # during an async call (i.e. linking/supervisors)
+ end
end
- # Run the actor loop
- def run
- process_messages
- cleanup ExitEvent.new(@proxy)
- rescue Exception => ex
- @running = false
- handle_crash(ex)
- ensure
- Pool.put @thread
+ # Wrap the given subject with an Actor
+ def initialize(subject)
+ @subject = subject
+
+ if subject.respond_to? :mailbox_factory
+ @mailbox = subject.mailbox_factory
+ else
+ @mailbox = Celluloid::Mailbox.new
+ end
+
+ @links = Links.new
+ @signals = Signals.new
+ @receivers = Receivers.new
+ @proxy = ActorProxy.new(@mailbox)
+ @running = true
+ @pending_calls = {}
+
+ @thread = Pool.get do
+ Thread.current[:actor] = self
+ Thread.current[:actor_proxy] = @proxy
+ Thread.current[:mailbox] = @mailbox
+
+ run
+ end
end
# Is this actor alive?
def alive?
@running
@@ -103,60 +108,76 @@
# Wait for the given signal
def wait(name)
@signals.wait name
end
- #######
- private
- #######
+ # Receive an asynchronous message
+ def receive(&block)
+ @receivers.receive(&block)
+ end
- # Process incoming messages
- def process_messages
- pending_calls = {}
-
+ # Run the actor loop
+ def run
while @running
begin
message = @mailbox.receive
- rescue MailboxShutdown
- # If the mailbox detects shutdown, exit the actor
- @running = false
rescue ExitEvent => exit_event
- fiber = Fiber.new do
- initialize_thread_locals
- handle_exit_event exit_event
- end
-
- call = fiber.resume
- pending_calls[call] = fiber if fiber.alive?
-
+ fiber = Celluloid.fiber { handle_exit_event exit_event; nil }
+ run_fiber fiber
retry
end
- case message
- when Call
- fiber = Fiber.new do
- initialize_thread_locals
- message.dispatch(@subject)
- end
+ handle_message message
+ end
- call = fiber.resume
- pending_calls[call] = fiber if fiber.alive?
- when Response
- fiber = pending_calls.delete(message.call)
+ cleanup ExitEvent.new(@proxy)
+ rescue MailboxShutdown
+ # If the mailbox detects shutdown, exit the actor
+ @running = false
+ rescue Exception => ex
+ @running = false
+ handle_crash(ex)
+ ensure
+ Pool.put @thread
+ end
- if fiber
- call = fiber.resume message
- pending_calls[call] = fiber if fiber.alive?
- end
- end # unexpected messages are ignored
+ # Run a method, handling when its Fiber is suspended
+ def run_fiber(fiber, value = nil)
+ result = fiber.resume value
+ if result.is_a? Celluloid::Call
+ @pending_calls[result.id] = fiber if fiber.alive?
+ elsif result
+ warning = "non-call returned from fiber: #{result.class}"
+ Celluloid.logger.debug warning if Celluloid.logger
end
+ nil
end
+ # Handle an incoming message
+ def handle_message(message)
+ case message
+ when Call
+ fiber = Celluloid.fiber { message.dispatch(@subject); nil }
+ run_fiber fiber
+ when Response
+ fiber = @pending_calls.delete(message.call_id)
+
+ if fiber
+ run_fiber fiber, message
+ else
+ warning = "spurious response to call #{message.call_id}"
+ Celluloid.logger.debug if Celluloid.logger
+ end
+ else
+ @receivers.handle_message(message)
+ end
+ message
+ end
+
# Handle exit events received by this actor
def handle_exit_event(exit_event)
- klass = @subject.class
- exit_handler = klass.exit_handler if klass.respond_to? :exit_handler
+ exit_handler = @subject.class.exit_handler
if exit_handler
return @subject.send(exit_handler, exit_event.actor, exit_event.reason)
end
# Reraise exceptions from linked actors
@@ -174,9 +195,15 @@
# Handle cleaning up this actor after it exits
def cleanup(exit_event)
@mailbox.shutdown
@links.send_event exit_event
+
+ begin
+ @subject.finalize if @subject.respond_to? :finalize
+ rescue Exception => finalizer_exception
+ log_error(finalizer_exception, "#{@subject.class}#finalize crashed!")
+ end
end
# Log errors when an actor crashes
def log_error(ex, message = "#{@subject.class} crashed!")
message << "\n#{ex.class}: #{ex.to_s}\n"