lib/celluloid/pool_manager.rb in celluloid-0.11.1 vs lib/celluloid/pool_manager.rb in celluloid-0.12.0.pre

- old
+ new

@@ -1,33 +1,56 @@ +require 'set' + module Celluloid # Manages a fixed-size pool of workers # Delegates work (i.e. methods) and supervises workers # Don't use this class directly. Instead use MyKlass.pool class PoolManager include Celluloid trap_exit :crash_handler def initialize(worker_class, options = {}) - @size = options[:size] - raise ArgumentError, "minimum pool size is 2" if @size && @size < 2 + @size = options[:size] || [Celluloid.cores, 2].max + raise ArgumentError, "minimum pool size is 2" if @size < 2 - @size ||= [Celluloid.cores, 2].max + @worker_class = worker_class @args = options[:args] ? Array(options[:args]) : [] - @worker_class = worker_class @idle = @size.times.map { worker_class.new_link(*@args) } + + # FIXME: Another data structure (e.g. Set) would be more appropriate + # here except it causes MRI to crash :o + @busy = [] end + def finalize + terminators = (@idle + @busy).each do |actor| + begin + actor.future(:terminate) + rescue DeadActorError, MailboxError + end + end + + terminators.compact.each { |terminator| terminator.value rescue nil } + end + def _send_(method, *args, &block) worker = __provision_worker begin worker._send_ method, *args, &block + rescue DeadActorError # if we get a dead actor out of the pool + wait :respawn_complete + worker = __provision_worker + retry rescue Exception => ex abort ex ensure - @idle << worker if worker.alive? + if worker.alive? + @idle << worker + @busy.delete worker + end end end def name _send_ @mailbox, :name @@ -54,26 +77,32 @@ end # Provision a new worker def __provision_worker while @idle.empty? - # Using exclusive mode blocks incoming messages, so they don't pile - # up as waiting Celluloid::Tasks - response = exclusive { receive { |msg| msg.is_a? Response } } + # Wait for responses from one of the busy workers + response = exclusive { receive { |msg| msg.is_a?(Response) } } Thread.current[:actor].handle_message(response) end - @idle.shift + + worker = @idle.shift + @busy << worker + + worker end # Spawn a new worker for every crashed one def crash_handler(actor, reason) + @busy.delete actor @idle.delete actor - return unless reason # don't restart workers that exit cleanly + return unless reason + @idle << @worker_class.new_link(*@args) + signal :respawn_complete end def respond_to?(method) - super || (@worker_class ? @worker_class.instance_methods.include?(method.to_sym) : false) + super || @worker_class.instance_methods.include?(method.to_sym) end def method_missing(method, *args, &block) if respond_to?(method) _send_ method, *args, &block