lib/choria/orchestrator/task.rb in choria-colt-0.4.0 vs lib/choria/orchestrator/task.rb in choria-colt-0.5.0
- old
+ new
@@ -1,25 +1,23 @@
-require 'choria/colt/data_structurer'
+require_relative 'task/result_set'
require 'active_support'
require 'active_support/core_ext/hash/indifferent_access'
module Choria
class Orchestrator
class Task
class Error < Orchestrator::Error; end
+ class NoNodesLeftError < Error; end
- attr_reader :id, :name, :input, :environment, :rpc_results, :results
- attr_accessor :rpc_responses
+ attr_reader :id, :name, :input, :environment
def initialize(orchestrator:, id: nil, name: nil, input: {}, environment: 'production')
@id = id
@name = name
@environment = environment
@orchestrator = orchestrator
- @results = []
-
return if @name.nil?
@input = default_input.merge input
logger.debug "Task inputs: #{input}"
validate_inputs
@@ -31,75 +29,86 @@
def files
metadata['files'].to_json
end
- def wait # rubocop:disable Metrics/AbcSize
- if @id.nil?
- rpc_responses_ok, rpc_responses_error = rpc_responses.partition { |res| (res[:body][:statuscode]).zero? }
- rpc_responses_error.each do |res|
- logger.error "Task request failed on '#{res[:senderid]}':\n#{pp res}"
- end
+ def results
+ result_set.results
+ end
- task_ids = rpc_responses_ok.map { |res| res[:body][:data][:task_id] }.uniq
+ def run
+ raise Error, 'Unable to run a task by ID' if name.nil?
- raise NotImplementedError, "Multiple task IDs: #{task_ids}" unless task_ids.count == 1
+ @pending_targets = rpc_client.discover
+ _download
+ _run_no_wait
+ end
- @id = task_ids.first
- end
+ def wait
+ raise Error, 'Task ID is required!' if @id.nil?
- wait_results
+ logger.info "Waiting task #{@id} results…"
+ @rpc_results = []
+ loop do
+ self.rpc_results = rpc_client.task_status(task_id: @id).map(&:results)
+ break if @pending_targets.empty?
+ end
end
def on_result(&block)
- @on_result = lambda { |result|
- block.call(result)
- }
+ @on_result = ->(result) { block.call(result) }
end
private
+ def result_set
+ @result_set ||= ResultSet.new(on_result: @on_result)
+ end
+
def rpc_results=(results)
- new_result_hosts = (results.map { |res| res[:sender] }) - (@results.map { |res| res[:sender] })
+ pending_results, completed_results = results.partition { |res| res[:data][:exitcode] == -1 }
+ @pending_targets ||= pending_results.map { |res| res[:sender] }
- new_result_hosts.each do |host|
- result = results.find { |res| res[:sender] == host }
-
- next unless result[:data][:exitcode] != -1
-
- logger.debug "New result for task ##{@id}: #{result}"
- structured_result = Choria::Colt::DataStructurer.structure(result).with_indifferent_access
-
- @on_result&.call(structured_result)
-
- @results << structured_result
+ new_results = completed_results.select { |res| @pending_targets.include? res[:sender] }
+ new_results.each do |res|
+ logger.debug "New result for task ##{@id}: #{res}"
+ result_set.integrate_result(res)
+ @pending_targets.delete res[:sender]
end
-
- @rpc_results = results
end
- def wait_results
- raise 'Task ID is required!' if @id.nil?
+ def process_rpc_response(rpc_response)
+ rpc_response.extend Orchestrator::RpcResponse
+ logger.debug " RPC Response: '#{rpc_response}'"
+ return unless rpc_response.rpc_error?
- logger.info "Waiting task #{@id} results…"
+ @pending_targets.delete rpc_response.sender
+ result_set.integrate_rpc_error(rpc_response)
+ end
- @results = []
- @rpc_results = []
-
- loop do
- self.rpc_results = @orchestrator.rpc_client.task_status(task_id: @id).map(&:results)
-
- break if terminated?
+ def _download
+ logger.info "Downloading task '#{name}' on #{rpc_client.discover.size} nodes…"
+ rpc_client.download(task: name, files: files, verbose: false) do |rpc_response|
+ process_rpc_response(rpc_response)
end
+
+ raise NoNodesLeftError, "No nodes left to continue after 'download' action" if @pending_targets.empty?
end
- def terminated?
- @rpc_results.each do |result|
- return false if result[:data][:exitcode] == -1
+ def _run_no_wait # rubocop:disable Metrics/AbcSize
+ logger.info "Starting task '#{name}' on #{rpc_client.discover.size} nodes…"
+ task_ids = []
+ rpc_client.run_no_wait(task: name, files: files, input: input.to_json, verbose: false) do |rpc_response|
+ process_rpc_response(rpc_response)
+ task_ids << rpc_response.task_id
end
+ raise NoNodesLeftError, "No nodes left to continue after 'run_no_wait' action" if @pending_targets.empty?
- true
+ task_ids.uniq!
+ raise NotImplementedError, "Multiple task IDs: #{task_ids}" unless task_ids.count == 1
+
+ @id = task_ids.first
end
def _metadata
logger.info 'Downloading task metadata from the Puppet Server…'
@orchestrator.tasks_support.task_metadata(@name, @environment)
@@ -119,9 +128,13 @@
raise Error, reason.sub(/^\n/, '') unless ok
end
def logger
@orchestrator.logger
+ end
+
+ def rpc_client
+ @orchestrator.rpc_client
end
end
end
end