lib/celluloid/actor.rb in celluloid-0.6.2 vs lib/celluloid/actor.rb in celluloid-0.7.0
- old
+ new
@@ -34,12 +34,11 @@
rescue MailboxError
raise DeadActorError, "attempted to call a dead actor"
end
if Celluloid.actor?
- # Yield to the actor scheduler, which resumes us when we get a response
- response = Fiber.yield(call)
+ response = Thread.current[:actor].wait [:call, call.id]
else
# Otherwise we're inside a normal thread, so block
response = Thread.mailbox.receive do |msg|
msg.respond_to?(:call_id) and msg.call_id == call.id
end
@@ -65,19 +64,19 @@
@subject = subject
if subject.respond_to? :mailbox_factory
@mailbox = subject.mailbox_factory
else
- @mailbox = Celluloid::Mailbox.new
+ @mailbox = Mailbox.new
end
@links = Links.new
@signals = Signals.new
@receivers = Receivers.new
+ @timers = Timers.new
@proxy = ActorProxy.new(@mailbox, self.class.to_s)
@running = true
- @pending_calls = {}
@thread = Pool.get do
Thread.current[:actor] = self
Thread.current[:mailbox] = @mailbox
@@ -105,25 +104,31 @@
def wait(name)
@signals.wait name
end
# Receive an asynchronous message
- def receive(&block)
- @receivers.receive(&block)
+ def receive(timeout = nil, &block)
+ @receivers.receive(timeout, &block)
end
# Run the actor loop
def run
while @running
begin
- message = @mailbox.receive
+ message = @mailbox.receive(timeout)
rescue ExitEvent => exit_event
- Celluloid::Fiber.new { handle_exit_event exit_event; nil }.resume
+ Task.new(:exit_handler) { handle_exit_event exit_event; nil }.resume
retry
end
- handle_message message
+ if message
+ handle_message message
+ else
+ # No message indicates a timeout
+ @timers.fire
+ @receivers.fire_timers
+ end
end
cleanup ExitEvent.new(@proxy)
rescue MailboxShutdown
# If the mailbox detects shutdown, exit the actor
@@ -133,28 +138,65 @@
handle_crash(ex)
ensure
Pool.put @thread
end
- # Register a fiber waiting for the response to a Celluloid::Call
- def register_fiber(call, fiber)
- raise ArgumentError, "attempted to register a dead fiber" unless fiber.alive?
- @pending_calls[call.id] = fiber
+ # How long to wait until the next timer fires
+ def timeout
+ i1 = @timers.wait_interval
+ i2 = @receivers.wait_interval
+
+ if i1 and i2
+ i1 < i2 ? i1 : i2
+ elsif i1
+ i1
+ else
+ i2
+ end
end
+ # Obtain a hash of tasks that are currently waiting
+ def tasks
+ # A hash of tasks to what they're waiting on is more meaningful to the
+ # end-user, and lets us make a copy of the tasks table, rather than
+ # handing them the one we're using internally across threads, a definite
+ # thread safety shared state no-no
+ tasks = {}
+ current_task = Thread.current[:task]
+ tasks[current_task] = :running if current_task
+
+ @signals.waiting.each do |waitable, task|
+ tasks[task] = waitable
+ end
+
+ tasks
+ end
+
+ # Schedule a block to run at the given time
+ def after(interval)
+ @timers.add(interval) do
+ Task.new(:timer) { yield; nil }.resume
+ end
+ end
+
+ # Sleep for the given amount of time
+ def sleep(interval)
+ task = Task.current
+ @timers.add(interval) { task.resume }
+ Task.suspend
+ end
+
# Handle an incoming message
def handle_message(message)
case message
when Call
- Celluloid::Fiber.new { message.dispatch(@subject); nil }.resume
+ Task.new(:message_handler) { message.dispatch(@subject); nil }.resume
when Response
- fiber = @pending_calls.delete(message.call_id)
+ handled_successfully = signal [:call, message.call_id], message
- if fiber
- fiber.resume message
- else
- Celluloid::Logger.debug("spurious response to call #{message.call_id}")
+ unless handled_successfully
+ Logger.debug("anomalous message! spurious response to call #{message.call_id}")
end
else
@receivers.handle_message(message)
end
message
@@ -172,24 +214,24 @@
raise exit_event.reason if exit_event.reason
end
# Handle any exceptions that occur within a running actor
def handle_crash(exception)
- Celluloid::Logger.crash("#{@subject.class} crashed!", exception)
+ Logger.crash("#{@subject.class} crashed!", exception)
cleanup ExitEvent.new(@proxy, exception)
rescue Exception => ex
- Celluloid::Logger.crash("#{@subject.class}: ERROR HANDLER CRASHED!", ex)
+ Logger.crash("#{@subject.class}: ERROR HANDLER CRASHED!", ex)
end
# Handle cleaning up this actor after it exits
def cleanup(exit_event)
@mailbox.shutdown
@links.send_event exit_event
begin
@subject.finalize if @subject.respond_to? :finalize
rescue Exception => ex
- Celluloid::Logger.crash("#{@subject.class}#finalize crashed!", ex)
+ Logger.crash("#{@subject.class}#finalize crashed!", ex)
end
end
end
end