lib/choria/orchestrator/task.rb in choria-colt-0.4.0 vs lib/choria/orchestrator/task.rb in choria-colt-0.5.0

- old
+ new

@@ -1,25 +1,23 @@ -require 'choria/colt/data_structurer' +require_relative 'task/result_set' require 'active_support' require 'active_support/core_ext/hash/indifferent_access' module Choria class Orchestrator class Task class Error < Orchestrator::Error; end + class NoNodesLeftError < Error; end - attr_reader :id, :name, :input, :environment, :rpc_results, :results - attr_accessor :rpc_responses + attr_reader :id, :name, :input, :environment def initialize(orchestrator:, id: nil, name: nil, input: {}, environment: 'production') @id = id @name = name @environment = environment @orchestrator = orchestrator - @results = [] - return if @name.nil? @input = default_input.merge input logger.debug "Task inputs: #{input}" validate_inputs @@ -31,75 +29,86 @@ def files metadata['files'].to_json end - def wait # rubocop:disable Metrics/AbcSize - if @id.nil? - rpc_responses_ok, rpc_responses_error = rpc_responses.partition { |res| (res[:body][:statuscode]).zero? } - rpc_responses_error.each do |res| - logger.error "Task request failed on '#{res[:senderid]}':\n#{pp res}" - end + def results + result_set.results + end - task_ids = rpc_responses_ok.map { |res| res[:body][:data][:task_id] }.uniq + def run + raise Error, 'Unable to run a task by ID' if name.nil? - raise NotImplementedError, "Multiple task IDs: #{task_ids}" unless task_ids.count == 1 + @pending_targets = rpc_client.discover + _download + _run_no_wait + end - @id = task_ids.first - end + def wait + raise Error, 'Task ID is required!' if @id.nil? - wait_results + logger.info "Waiting task #{@id} results…" + @rpc_results = [] + loop do + self.rpc_results = rpc_client.task_status(task_id: @id).map(&:results) + break if @pending_targets.empty? + end end def on_result(&block) - @on_result = lambda { |result| - block.call(result) - } + @on_result = ->(result) { block.call(result) } end private + def result_set + @result_set ||= ResultSet.new(on_result: @on_result) + end + def rpc_results=(results) - new_result_hosts = (results.map { |res| res[:sender] }) - (@results.map { |res| res[:sender] }) + pending_results, completed_results = results.partition { |res| res[:data][:exitcode] == -1 } + @pending_targets ||= pending_results.map { |res| res[:sender] } - new_result_hosts.each do |host| - result = results.find { |res| res[:sender] == host } - - next unless result[:data][:exitcode] != -1 - - logger.debug "New result for task ##{@id}: #{result}" - structured_result = Choria::Colt::DataStructurer.structure(result).with_indifferent_access - - @on_result&.call(structured_result) - - @results << structured_result + new_results = completed_results.select { |res| @pending_targets.include? res[:sender] } + new_results.each do |res| + logger.debug "New result for task ##{@id}: #{res}" + result_set.integrate_result(res) + @pending_targets.delete res[:sender] end - - @rpc_results = results end - def wait_results - raise 'Task ID is required!' if @id.nil? + def process_rpc_response(rpc_response) + rpc_response.extend Orchestrator::RpcResponse + logger.debug " RPC Response: '#{rpc_response}'" + return unless rpc_response.rpc_error? - logger.info "Waiting task #{@id} results…" + @pending_targets.delete rpc_response.sender + result_set.integrate_rpc_error(rpc_response) + end - @results = [] - @rpc_results = [] - - loop do - self.rpc_results = @orchestrator.rpc_client.task_status(task_id: @id).map(&:results) - - break if terminated? + def _download + logger.info "Downloading task '#{name}' on #{rpc_client.discover.size} nodes…" + rpc_client.download(task: name, files: files, verbose: false) do |rpc_response| + process_rpc_response(rpc_response) end + + raise NoNodesLeftError, "No nodes left to continue after 'download' action" if @pending_targets.empty? end - def terminated? - @rpc_results.each do |result| - return false if result[:data][:exitcode] == -1 + def _run_no_wait # rubocop:disable Metrics/AbcSize + logger.info "Starting task '#{name}' on #{rpc_client.discover.size} nodes…" + task_ids = [] + rpc_client.run_no_wait(task: name, files: files, input: input.to_json, verbose: false) do |rpc_response| + process_rpc_response(rpc_response) + task_ids << rpc_response.task_id end + raise NoNodesLeftError, "No nodes left to continue after 'run_no_wait' action" if @pending_targets.empty? - true + task_ids.uniq! + raise NotImplementedError, "Multiple task IDs: #{task_ids}" unless task_ids.count == 1 + + @id = task_ids.first end def _metadata logger.info 'Downloading task metadata from the Puppet Server…' @orchestrator.tasks_support.task_metadata(@name, @environment) @@ -119,9 +128,13 @@ raise Error, reason.sub(/^\n/, '') unless ok end def logger @orchestrator.logger + end + + def rpc_client + @orchestrator.rpc_client end end end end