lib/choria/orchestrator.rb in choria-colt-0.2.0 vs lib/choria/orchestrator.rb in choria-colt-0.3.0

- old
+ new

@@ -19,64 +19,47 @@ def tasks_support @tasks_support ||= MCollective::Util::Choria.new.tasks_support end - def run(task, targets: nil, verbose: false) - logger.debug "Running task: '#{task.name}' (targets: #{targets.nil? ? 'all' : targets})" + def run(task, targets: nil, targets_with_classes: nil, verbose: false) # rubocop:disable Metrics/AbcSize rpc_client.progress = verbose + logger.debug "Running task: '#{task.name}' (targets: #{targets.nil? ? 'all' : targets})" targets&.each { |target| rpc_client.identity_filter target } + unless targets_with_classes.nil? + logger.debug "Filtering targets with classes: #{targets_with_classes}" + targets_with_classes.each { |klass| rpc_client.class_filter klass } + end + + logger.info 'Discovering targets…' raise DiscoverError, 'No request sent, no node discovered' if rpc_client.discover.size.zero? - logger.info "Attempting to download and run task '#{task.name}' on #{rpc_client.discover.size} nodes" - + logger.info "Downloading task '#{task.name}' on #{rpc_client.discover.size} nodes…" rpc_client.download(task: task.name, files: task.files, verbose: verbose) - # TODO: Extract error from 'rpc' (see MCollective::RPC#printrpc) - responses = [] + logger.info "Starting task '#{task.name}' on #{rpc_client.discover.size} nodes…" rpc_client.run_no_wait(task: task.name, files: task.files, input: task.input.to_json, verbose: verbose) do |response| logger.debug " Response: '#{response}'" responses << response end # TODO: Include stats in logs when logger will be available (see MCollective::RPC#printrpcstats) task.rpc_responses = responses end - def wait_results(task_id:) - raise 'Task ID is required!' if task_id.nil? - - task_status_results = nil - loop do - task_status_results = rpc_client.task_status(task_id: task_id).map(&:results) - logger.debug "Task ##{task_id} status: #{task_status_results}" - break if task_completed? task_status_results - end - - task_status_results - end - - def task_completed?(results) - results.each do |result| - return false unless result[:data][:completed] - end - - true - end - def validate_rpc_result(result) raise Error, "The RPC agent returned an error: #{result[:statusmsg]}" unless (result[:statuscode]).zero? end - private - def rpc_client @rpc_client ||= rpcclient('bolt_tasks', options: rpc_options) end + + private def rpc_options { verbose: false, disctimeout: nil,