# coding: utf-8 require 'sshkit' require 'thread' module SSHKit module Runner class SafeSequential < Sequential attr_reader :failed, :succeeded def initialize(hosts, options = nil, &block) options ||= {} @on_errors = options.delete(:on_errors) || :exit super(hosts, options, &block) @failed = [] @succeeded = [] end def execute super rescue return false else return @failed.length == 0 end private def run_backend(host, &block) backend(host, &block).run rescue => e @failed << { host: host, error: e } raise if @on_errors == :exit else @succeeded << host end end class SafeParallel < SafeSequential attr_writer :group_size def initialize(hosts, options = nil, &block) super(hosts, options, &block) @failed = Queue.new @succeeded = Queue.new end def execute hosts.each_slice(group_size).each do |slice| threads = [] slice.each do |host| threads << Thread.new(host) do |h| begin backend(h, &block).run succeeded << host rescue => e failed << { host: host, error: e } end end end threads.map(&:join) return convert_results if failed.length > 0 && @on_errors == :exit sleep wait_interval end convert_results end private def group_size @group_size || options[:limit] || 2 end def convert_results @failed = queue_to_array(failed) @succeeded = queue_to_array(succeeded) return true if failed.length == 0 end def queue_to_array(queue) res = [] begin loop do obj = queue.pop(true) break unless obj res << obj end rescue ThreadError return res end res end end end end