lib/celluloid/pool_manager.rb in celluloid-0.11.1 vs lib/celluloid/pool_manager.rb in celluloid-0.12.0.pre
- old
+ new
@@ -1,33 +1,56 @@
+require 'set'
+
module Celluloid
# Manages a fixed-size pool of workers
# Delegates work (i.e. methods) and supervises workers
# Don't use this class directly. Instead use MyKlass.pool
class PoolManager
include Celluloid
trap_exit :crash_handler
def initialize(worker_class, options = {})
- @size = options[:size]
- raise ArgumentError, "minimum pool size is 2" if @size && @size < 2
+ @size = options[:size] || [Celluloid.cores, 2].max
+ raise ArgumentError, "minimum pool size is 2" if @size < 2
- @size ||= [Celluloid.cores, 2].max
+ @worker_class = worker_class
@args = options[:args] ? Array(options[:args]) : []
- @worker_class = worker_class
@idle = @size.times.map { worker_class.new_link(*@args) }
+
+ # FIXME: Another data structure (e.g. Set) would be more appropriate
+ # here except it causes MRI to crash :o
+ @busy = []
end
+ def finalize
+ terminators = (@idle + @busy).each do |actor|
+ begin
+ actor.future(:terminate)
+ rescue DeadActorError, MailboxError
+ end
+ end
+
+ terminators.compact.each { |terminator| terminator.value rescue nil }
+ end
+
def _send_(method, *args, &block)
worker = __provision_worker
begin
worker._send_ method, *args, &block
+ rescue DeadActorError # if we get a dead actor out of the pool
+ wait :respawn_complete
+ worker = __provision_worker
+ retry
rescue Exception => ex
abort ex
ensure
- @idle << worker if worker.alive?
+ if worker.alive?
+ @idle << worker
+ @busy.delete worker
+ end
end
end
def name
_send_ @mailbox, :name
@@ -54,26 +77,32 @@
end
# Provision a new worker
def __provision_worker
while @idle.empty?
- # Using exclusive mode blocks incoming messages, so they don't pile
- # up as waiting Celluloid::Tasks
- response = exclusive { receive { |msg| msg.is_a? Response } }
+ # Wait for responses from one of the busy workers
+ response = exclusive { receive { |msg| msg.is_a?(Response) } }
Thread.current[:actor].handle_message(response)
end
- @idle.shift
+
+ worker = @idle.shift
+ @busy << worker
+
+ worker
end
# Spawn a new worker for every crashed one
def crash_handler(actor, reason)
+ @busy.delete actor
@idle.delete actor
- return unless reason # don't restart workers that exit cleanly
+ return unless reason
+
@idle << @worker_class.new_link(*@args)
+ signal :respawn_complete
end
def respond_to?(method)
- super || (@worker_class ? @worker_class.instance_methods.include?(method.to_sym) : false)
+ super || @worker_class.instance_methods.include?(method.to_sym)
end
def method_missing(method, *args, &block)
if respond_to?(method)
_send_ method, *args, &block