lib/celluloid/actor.rb in celluloid-0.14.1 vs lib/celluloid/actor.rb in celluloid-0.15.0.pre

- old
+ new

@@ -1,19 +1,19 @@ require 'timers' module Celluloid # Don't do Actor-like things outside Actor scope - class NotActorError < StandardError; end + class NotActorError < Celluloid::Error; end # Trying to do something to a dead actor - class DeadActorError < StandardError; end + class DeadActorError < Celluloid::Error; end # A timeout occured before the given request could complete - class TimeoutError < StandardError; end + class TimeoutError < Celluloid::Error; end # The sender made an error, not the current actor - class AbortError < StandardError + class AbortError < Celluloid::Error attr_reader :cause def initialize(cause) @cause = cause super "caused by #{cause.inspect}: #{cause.to_s}" @@ -75,12 +75,12 @@ end # Obtain all running actors in the system def all actors = [] - Thread.list.each do |t| - next unless t.celluloid? && t.role == :actor + Celluloid.internal_pool.each do |t| + next unless t.role == :actor actors << t.actor.proxy if t.actor && t.actor.respond_to?(:proxy) end actors end @@ -119,14 +119,11 @@ end # Forcibly kill a given actor def kill(actor) actor.thread.kill - begin - actor.mailbox.shutdown - rescue DeadActorError - end + actor.mailbox.shutdown if actor.mailbox.alive? end # Wait for an actor to terminate def join(actor, timeout = nil) actor.thread.join(timeout) @@ -134,16 +131,19 @@ end end # Wrap the given subject with an Actor def initialize(subject, options = {}) - @subject = subject - @mailbox = options[:mailbox] || Mailbox.new + @subject = subject + + @mailbox = options.fetch(:mailbox_class, Mailbox).new + @mailbox.max_size = options.fetch(:mailbox_size, nil) + + @task_class = options[:task_class] || Celluloid.task_class @exit_handler = options[:exit_handler] @exclusives = options[:exclusive_methods] @receiver_block_executions = options[:receiver_block_executions] - @task_class = options[:task_class] || Celluloid.task_class @tasks = TaskSet.new @links = Links.new @signals = Signals.new @receivers = Receivers.new @@ -191,32 +191,13 @@ # Terminate this actor def terminate @running = false end - # Is this actor running in exclusive mode? - def exclusive? - @exclusive - end - - # Execute a code block in exclusive mode. - def exclusive - if @exclusive - yield - else - begin - @exclusive = true - yield - ensure - @exclusive = false - end - end - end - # Perform a linking request with another actor def linking_request(receiver, type) - exclusive do + Celluloid.exclusive do start_time = Time.now receiver.mailbox << LinkingRequest.new(Actor.current, type) system_events = [] loop do @@ -243,11 +224,11 @@ end end # Send a signal with the given name to all waiting methods def signal(name, value = nil) - @signals.send name, value + @signals.broadcast name, value end # Wait for the given signal def wait(name) @signals.wait name @@ -312,42 +293,46 @@ def handle_message(message) case message when SystemEvent handle_system_event message when Call - task(:call, message.method) { - if @receiver_block_executions && meth = message.method - if meth == :__send__ - meth = message.arguments.first - end - if @receiver_block_executions.include?(meth.to_sym) - message.execute_block_on_receiver - end + meth = message.method + if meth == :__send__ + meth = message.arguments.first + end + if @receiver_block_executions && meth + if @receiver_block_executions.include?(meth.to_sym) + message.execute_block_on_receiver end + end + + task(:call, :method_name => meth, :dangerous_suspend => meth == :initialize) { message.dispatch(@subject) } when BlockCall task(:invoke_block) { message.dispatch } when BlockResponse, Response message.dispatch else - @receivers.handle_message(message) + unless @receivers.handle_message(message) + Logger.debug "Discarded message (unhandled): #{message}" + end end message end # Handle high-priority system event messages def handle_system_event(event) case event when ExitEvent - task(:exit_handler, @exit_handler) { handle_exit_event event } + task(:exit_handler, :method_name => @exit_handler) { handle_exit_event event } when LinkingRequest event.process(links) when NamingRequest @name = event.name when TerminationRequest - @running = false + terminate when SignalConditionRequest event.call end end @@ -380,47 +365,40 @@ Thread.current[:celluloid_mailbox] = nil end # Run the user-defined finalizer, if one is set def run_finalizer - # FIXME: remove before Celluloid 1.0 - if @subject.respond_to?(:finalize) && @subject.class.finalizer != :finalize - Logger.warn("DEPRECATION WARNING: #{@subject.class}#finalize is deprecated and will be removed in Celluloid 1.0. " + - "Define finalizers with '#{@subject.class}.finalizer :callback.'") - - task(:finalizer, :finalize) { @subject.finalize } - end - finalizer = @subject.class.finalizer if finalizer && @subject.respond_to?(finalizer, true) - task(:finalizer, :finalize) { @subject.__send__(finalizer) } + task(:finalizer, :method_name => finalizer, :dangerous_suspend => true) { @subject.__send__(finalizer) } end rescue => ex Logger.crash("#{@subject.class}#finalize crashed!", ex) end # Clean up after this actor def cleanup(exit_event) @mailbox.shutdown @links.each do |actor| - begin + if actor.mailbox.alive? actor.mailbox << exit_event - rescue MailboxError - # We're exiting/crashing, they're dead. Give up :( end end - tasks.each { |task| task.terminate } + tasks.to_a.each { |task| task.terminate } rescue => ex Logger.crash("#{@subject.class}: CLEANUP CRASHED!", ex) end # Run a method inside a task unless it's exclusive - def task(task_type, method_name = nil, &block) - if @exclusives && (@exclusives == :all || (method_name && @exclusives.include?(method_name.to_sym))) - exclusive { block.call } - else - @task_class.new(task_type, &block).resume - end + def task(task_type, meta = nil) + method_name = meta && meta.fetch(:method_name, nil) + @task_class.new(task_type, meta) { + if @exclusives && (@exclusives == :all || (method_name && @exclusives.include?(method_name.to_sym))) + Celluloid.exclusive { yield } + else + yield + end + }.resume end end end