lib/async/container/threaded.rb in async-container-0.9.0 vs lib/async/container/threaded.rb in async-container-0.10.0

- old
+ new

@@ -18,13 +18,12 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. require 'async/reactor' require 'thread' +require_relative 'statistics' -require 'async/io/notification' - module Async module Container # Manages a reactor within one or more threads. class Threaded class Instance @@ -35,48 +34,103 @@ def name= value @thread.name = value end end - def initialize(concurrency: 1, name: nil, &block) - @reactors = concurrency.times.collect do - Async::Reactor.new + def self.run(*args, &block) + self.new.run(*args, &block) + end + + def initialize + @threads = [] + @running = true + @statistics = Statistics.new + end + + attr :statistics + + def run(count: Container.processor_count, **options, &block) + count.times do + async(**options, &block) end - @threads = @reactors.collect do |reactor| - ::Thread.new do - thread = ::Thread.current - - thread.abort_on_exception = true - thread.name = name if name - + return self + end + + def spawn(name: nil, restart: false, &block) + @statistics.spawn! + + thread = ::Thread.new do + thread = ::Thread.current + + thread.report_on_exception = false + thread.name = name if name + + instance = Instance.new(thread) + + while @running begin - reactor.run(Instance.new(thread), &block) - rescue Interrupt - # Exit cleanly. + yield instance + rescue Exception => exception + Async.logger.error(self) {exception} + + @statistics.failure! end + + if restart + @statistics.restart! + else + break + end end end - @finished = nil + @threads << thread + + return self end + def async(name: nil, restart: false, &block) + spawn do |instance| + begin + Async::Reactor.run(instance, &block) + rescue Interrupt + # Graceful exit. + end + end + + return self + end + def self.multiprocess? false end def wait - return if @finished + yield if block_given? @threads.each(&:join) - - @finished = true + @threads.clear + + return nil + rescue Interrupt + # Graceful exit. + Async.logger.debug(self) {$!} end - def stop - @reactors.each(&:stop) + # Gracefully shut down all reactors. + def stop(graceful = true, &block) + @running = false - wait + if graceful + @threads.each{|thread| thread.raise(Interrupt)} + else + @threads.each(&:kill) + end + + self.wait(&block) + ensure + @running = true end end end end