lib/celluloid/actor.rb in celluloid-0.7.2 vs lib/celluloid/actor.rb in celluloid-0.8.0
- old
+ new
@@ -18,15 +18,12 @@
# Actors are Celluloid's concurrency primitive. They're implemented as
# normal Ruby objects wrapped in threads which communicate with asynchronous
# messages.
class Actor
extend Registry
+ attr_reader :proxy, :tasks, :links, :mailbox
- attr_reader :proxy
- attr_reader :links
- attr_reader :mailbox
-
# Invoke a method on the given actor via its mailbox
def self.call(mailbox, meth, *args, &block)
call = SyncCall.new(Thread.mailbox, meth, args, block)
begin
@@ -34,19 +31,20 @@
rescue MailboxError
raise DeadActorError, "attempted to call a dead actor"
end
if Celluloid.actor?
- response = Thread.current[:actor].wait [:call, call.id]
+ # The current task will be automatically resumed when we get a response
+ Task.suspend :callwait
else
# Otherwise we're inside a normal thread, so block
response = Thread.mailbox.receive do |msg|
- msg.respond_to?(:call_id) and msg.call_id == call.id
+ msg.respond_to?(:call) and msg.call == call
end
- end
- response.value
+ response.value
+ end
end
# Invoke a method asynchronously on an actor via its mailbox
def self.async(mailbox, meth, *args, &block)
begin
@@ -57,28 +55,36 @@
# async calls should find other ways to deal with actors dying
# during an async call (i.e. linking/supervisors)
end
end
+ # Call a method asynchronously and retrieve its value later
+ def self.future(mailbox, meth, *args, &block)
+ future = Future.new
+ future.execute(mailbox, meth, args, block)
+ future
+ end
+
# Wrap the given subject with an Actor
def initialize(subject)
@subject = subject
if subject.respond_to? :mailbox_factory
@mailbox = subject.mailbox_factory
else
@mailbox = Mailbox.new
end
+ @proxy = ActorProxy.new(@mailbox, subject.class.to_s)
+ @tasks = Set.new
@links = Links.new
@signals = Signals.new
@receivers = Receivers.new
@timers = Timers.new
- @proxy = ActorProxy.new(@mailbox, subject.class.to_s)
@running = true
- @thread = Pool.get do
+ @thread = ThreadPool.get do
Thread.current[:actor] = self
Thread.current[:mailbox] = @mailbox
run
end
@@ -135,11 +141,11 @@
@running = false
rescue Exception => ex
@running = false
handle_crash(ex)
ensure
- Pool.put @thread
+ ThreadPool.put @thread
end
# How long to wait until the next timer fires
def timeout
i1 = @timers.wait_interval
@@ -152,31 +158,10 @@
else
i2
end
end
- # Obtain a hash of tasks that are currently waiting
- def tasks
- # A hash of tasks to what they're waiting on is more meaningful to the
- # end-user, and lets us make a copy of the tasks table, rather than
- # handing them the one we're using internally across threads, a definite
- # thread safety shared state no-no
- tasks = {}
- current_task = Task.current rescue nil
- tasks[current_task] = :running if current_task
-
- @signals.waiting.each do |waitable, waiters|
- if waiters.is_a? Enumerable
- waiters.each { |waiter| tasks[waiter] = waitable }
- else
- tasks[waiters] = waitable
- end
- end
-
- tasks
- end
-
# Schedule a block to run at the given time
def after(interval)
@timers.add(interval) do
Task.new(:timer) { yield }.resume
end
@@ -184,24 +169,20 @@
# Sleep for the given amount of time
def sleep(interval)
task = Task.current
@timers.add(interval) { task.resume }
- Task.suspend
+ Task.suspend :sleeping
end
# Handle an incoming message
def handle_message(message)
case message
when Call
Task.new(:message_handler) { message.dispatch(@subject) }.resume
when Response
- handled_successfully = signal [:call, message.call_id], message
-
- unless handled_successfully
- Logger.debug("anomalous message! spurious response to call #{message.call_id}")
- end
+ message.call.task.resume message.value
else
@receivers.handle_message(message)
end
message
end
@@ -228,10 +209,10 @@
# Handle cleaning up this actor after it exits
def cleanup(exit_event)
@mailbox.shutdown
@links.send_event exit_event
- tasks.each { |task, _| task.terminate }
+ tasks.each { |task| task.terminate }
begin
@subject.finalize if @subject.respond_to? :finalize
rescue Exception => ex
Logger.crash("#{@subject.class}#finalize crashed!", ex)