lib/celluloid/actor.rb in celluloid-0.13.0 vs lib/celluloid/actor.rb in celluloid-0.14.0.pre
- old
+ new
@@ -8,11 +8,11 @@
class DeadActorError < StandardError; end
# A timeout occured before the given request could complete
class TimeoutError < StandardError; end
- # The caller made an error, not the current actor
+ # The sender made an error, not the current actor
class AbortError < StandardError
attr_reader :cause
def initialize(cause)
@cause = cause
@@ -25,11 +25,11 @@
# Actors are Celluloid's concurrency primitive. They're implemented as
# normal Ruby objects wrapped in threads which communicate with asynchronous
# messages.
class Actor
- attr_reader :subject, :proxy, :tasks, :links, :mailbox, :thread, :name
+ attr_reader :subject, :proxy, :tasks, :links, :mailbox, :thread, :name, :locals
class << self
extend Forwardable
def_delegators "Celluloid::Registry.root", :[], :[]=
@@ -56,46 +56,33 @@
actor.name
end
# Invoke a method on the given actor via its mailbox
def call(mailbox, meth, *args, &block)
- call = SyncCall.new(Thread.mailbox, meth, args, block)
-
- begin
- mailbox << call
- rescue MailboxError
- raise DeadActorError, "attempted to call a dead actor"
- end
-
- Celluloid.suspend(:callwait, call).value
+ proxy = SyncProxy.new(mailbox, "UnknownClass")
+ proxy.method_missing(meth, *args, &block)
end
# Invoke a method asynchronously on an actor via its mailbox
def async(mailbox, meth, *args, &block)
- begin
- mailbox << AsyncCall.new(meth, args, block)
- rescue MailboxError
- # Silently swallow asynchronous calls to dead actors. There's no way
- # to reliably generate DeadActorErrors for async calls, so users of
- # async calls should find other ways to deal with actors dying
- # during an async call (i.e. linking/supervisors)
- end
+ proxy = AsyncProxy.new(mailbox, "UnknownClass")
+ proxy.method_missing(meth, *args, &block)
end
# Call a method asynchronously and retrieve its value later
def future(mailbox, meth, *args, &block)
- future = Future.new
- future.execute(mailbox, meth, args, block)
- future
+ proxy = FutureProxy.new(mailbox, "UnknownClass")
+ proxy.method_missing(meth, *args, &block)
end
# Obtain all running actors in the system
def all
actors = []
Thread.list.each do |t|
- actor = t[:celluloid_actor]
- actors << actor.proxy if actor and actor.respond_to?(:proxy)
+ next unless t.celluloid?
+ next if t.task
+ actors << t.actor.proxy if t.actor && t.actor.respond_to?(:proxy)
end
actors
end
# Watch for exit events from another actor
@@ -140,43 +127,49 @@
rescue DeadActorError
end
end
# Wait for an actor to terminate
- def join(actor)
- actor.thread.join
+ def join(actor, timeout = nil)
+ actor.thread.join(timeout)
actor
end
end
# Wrap the given subject with an Actor
def initialize(subject, options = {})
@subject = subject
@mailbox = options[:mailbox] || Mailbox.new
@exit_handler = options[:exit_handler]
@exclusives = options[:exclusive_methods]
+ @receiver_block_executions = options[:receiver_block_executions]
@task_class = options[:task_class] || Celluloid.task_class
@tasks = TaskSet.new
@links = Links.new
@signals = Signals.new
@receivers = Receivers.new
@timers = Timers.new
@running = true
@exclusive = false
@name = nil
+ @locals = {}
@thread = ThreadHandle.new do
- Thread.current[:celluloid_actor] = self
- Thread.current[:celluloid_mailbox] = @mailbox
+ setup_thread
run
end
@proxy = (options[:proxy_class] || ActorProxy).new(self)
@subject.instance_variable_set(OWNER_IVAR, self)
end
+ def setup_thread
+ Thread.current[:celluloid_actor] = self
+ Thread.current[:celluloid_mailbox] = @mailbox
+ end
+
# Run the actor loop
def run
begin
while @running
if message = @mailbox.receive(timeout_interval)
@@ -321,11 +314,23 @@
def handle_message(message)
case message
when SystemEvent
handle_system_event message
when Call
- task(:message_handler, message.method) { message.dispatch(@subject) }
- when Response
+ task(:call, message.method) {
+ if @receiver_block_executions && meth = message.method
+ if meth == :__send__
+ meth = message.arguments.first
+ end
+ if @receiver_block_executions.include?(meth.to_sym)
+ message.execute_block_on_receiver
+ end
+ end
+ message.dispatch(@subject)
+ }
+ when BlockCall
+ task(:invoke_block) { message.dispatch }
+ when BlockResponse, Response
message.dispatch
else
@receivers.handle_message(message)
end
message