lib/celluloid/actor.rb in celluloid-0.13.0 vs lib/celluloid/actor.rb in celluloid-0.14.0.pre

- old
+ new

@@ -8,11 +8,11 @@ class DeadActorError < StandardError; end # A timeout occured before the given request could complete class TimeoutError < StandardError; end - # The caller made an error, not the current actor + # The sender made an error, not the current actor class AbortError < StandardError attr_reader :cause def initialize(cause) @cause = cause @@ -25,11 +25,11 @@ # Actors are Celluloid's concurrency primitive. They're implemented as # normal Ruby objects wrapped in threads which communicate with asynchronous # messages. class Actor - attr_reader :subject, :proxy, :tasks, :links, :mailbox, :thread, :name + attr_reader :subject, :proxy, :tasks, :links, :mailbox, :thread, :name, :locals class << self extend Forwardable def_delegators "Celluloid::Registry.root", :[], :[]= @@ -56,46 +56,33 @@ actor.name end # Invoke a method on the given actor via its mailbox def call(mailbox, meth, *args, &block) - call = SyncCall.new(Thread.mailbox, meth, args, block) - - begin - mailbox << call - rescue MailboxError - raise DeadActorError, "attempted to call a dead actor" - end - - Celluloid.suspend(:callwait, call).value + proxy = SyncProxy.new(mailbox, "UnknownClass") + proxy.method_missing(meth, *args, &block) end # Invoke a method asynchronously on an actor via its mailbox def async(mailbox, meth, *args, &block) - begin - mailbox << AsyncCall.new(meth, args, block) - rescue MailboxError - # Silently swallow asynchronous calls to dead actors. There's no way - # to reliably generate DeadActorErrors for async calls, so users of - # async calls should find other ways to deal with actors dying - # during an async call (i.e. linking/supervisors) - end + proxy = AsyncProxy.new(mailbox, "UnknownClass") + proxy.method_missing(meth, *args, &block) end # Call a method asynchronously and retrieve its value later def future(mailbox, meth, *args, &block) - future = Future.new - future.execute(mailbox, meth, args, block) - future + proxy = FutureProxy.new(mailbox, "UnknownClass") + proxy.method_missing(meth, *args, &block) end # Obtain all running actors in the system def all actors = [] Thread.list.each do |t| - actor = t[:celluloid_actor] - actors << actor.proxy if actor and actor.respond_to?(:proxy) + next unless t.celluloid? + next if t.task + actors << t.actor.proxy if t.actor && t.actor.respond_to?(:proxy) end actors end # Watch for exit events from another actor @@ -140,43 +127,49 @@ rescue DeadActorError end end # Wait for an actor to terminate - def join(actor) - actor.thread.join + def join(actor, timeout = nil) + actor.thread.join(timeout) actor end end # Wrap the given subject with an Actor def initialize(subject, options = {}) @subject = subject @mailbox = options[:mailbox] || Mailbox.new @exit_handler = options[:exit_handler] @exclusives = options[:exclusive_methods] + @receiver_block_executions = options[:receiver_block_executions] @task_class = options[:task_class] || Celluloid.task_class @tasks = TaskSet.new @links = Links.new @signals = Signals.new @receivers = Receivers.new @timers = Timers.new @running = true @exclusive = false @name = nil + @locals = {} @thread = ThreadHandle.new do - Thread.current[:celluloid_actor] = self - Thread.current[:celluloid_mailbox] = @mailbox + setup_thread run end @proxy = (options[:proxy_class] || ActorProxy).new(self) @subject.instance_variable_set(OWNER_IVAR, self) end + def setup_thread + Thread.current[:celluloid_actor] = self + Thread.current[:celluloid_mailbox] = @mailbox + end + # Run the actor loop def run begin while @running if message = @mailbox.receive(timeout_interval) @@ -321,11 +314,23 @@ def handle_message(message) case message when SystemEvent handle_system_event message when Call - task(:message_handler, message.method) { message.dispatch(@subject) } - when Response + task(:call, message.method) { + if @receiver_block_executions && meth = message.method + if meth == :__send__ + meth = message.arguments.first + end + if @receiver_block_executions.include?(meth.to_sym) + message.execute_block_on_receiver + end + end + message.dispatch(@subject) + } + when BlockCall + task(:invoke_block) { message.dispatch } + when BlockResponse, Response message.dispatch else @receivers.handle_message(message) end message