module Actions module Pulp class AbstractAsyncTask < Pulp::Abstract include Actions::Base::Polling include ::Dynflow::Action::Cancellable FINISHED_STATES = %w(finished error canceled skipped).freeze # A call report (documented http://pulp-dev-guide.readthedocs.org/en/latest/conventions/sync-v-async.html) # Looks like: { # "result": {}, # "error": {}, # "spawned_tasks": [{"_href": "/pulp/api/v2/tasks/7744e2df-39b9-46f0-bb10-feffa2f7014b/", # "task_id": "7744e2df-39b9-46f0-bb10-feffa2f7014b" }] # } # # # A Task (documented http://pulp-dev-guide.readthedocs.org/en/latest/integration/rest-api/dispatch/task.html#task-management) # Looks like: # { # "_href": "/pulp/api/v2/tasks/0fe4fcab-a040-11e1-a71c-00508d977dff/", # "state": "running", # "queue": "reserved_resource_worker-0@your.domain.com", # "task_id": "0fe4fcab-a040-11e1-a71c-00508d977dff", # "task_type": "pulp.server.tasks.repository.sync_with_auto_publish", # "progress_report": {}, # contents depend on the operation # "result": null, # "start_time": "2012-05-17T16:48:00Z", # "finish_time": null, # "exception": null, # "traceback": null, # "tags": [ # "pulp:repository:f16", # "pulp:action:sync" # ], # "spawned_tasks": [{"href": "/pulp/api/v2/tasks/7744e2df-39b9-46f0-bb10-feffa2f7014b/", # "task_id": "7744e2df-39b9-46f0-bb10-feffa2f7014b" }], # "error": null #} def run(event = nil) # do nothing when the action is being skipped unless event == Dynflow::Action::Skip super end end def humanized_state case state when :running if self.external_task.nil? _("initiating Pulp task") else _("checking Pulp task status") end when :suspended if external_task&.all? { |task| task[:start_time].nil? } _("waiting for Pulp to start the task") else _("waiting for Pulp to finish the task") end else super end end def done? external_task&.all? { |task| task[:finish_time] || FINISHED_STATES.include?(task[:state]) } end def external_task output[:pulp_tasks] end def cancel! cancel self.external_task = poll_external_task # We suspend the action and the polling will take care of finding # out if the cancelling was successful suspend unless done? end def cancel output[:pulp_tasks].each do |pulp_task| task_resource.cancel(pulp_task['task_id']) #the main task may have completed, so cancel spawned tasks too pulp_task['spawned_tasks']&.each { |spawned| task_resource.cancel(spawned['task_id']) } end end def rescue_external_task(error) if error.is_a?(::Katello::Errors::PulpError) fail error else super end end private def external_task=(external_task_data) external_task_data = [] if external_task_data.nil? external_task_data = [external_task_data] if external_task_data.is_a?(Hash) new_tasks = [] external_task_data.each do |task| if task['spawned_tasks'].length > 0 spawned_ids = task['spawned_tasks'].map { |spawned| spawned['task_id'] } new_tasks.concat(get_new_tasks(external_task_data, spawned_ids)) end end # Combine new tasks and remove call reports and ignored tasks output[:pulp_tasks] = (external_task_data + new_tasks).reject do |task| task["task_id"].nil? || (task["tags"] && (task["tags"] & ignored_tags).present?) end output[:pulp_tasks].each do |pulp_task| if (pulp_exception = ::Katello::Errors::PulpError.from_task(pulp_task)) fail pulp_exception end end end def get_new_tasks(current_list, spawned_task_ids) new_tasks = (spawned_task_ids - current_list.map { |task| task['task_id'] }).map do |task_id| task_resource.poll(task_id) end if new_tasks.empty? [] else spawned = new_tasks.map { |t| t['spawned_tasks'].map { |st| st['task_id'] } }.flatten new_tasks + get_new_tasks(current_list + new_tasks, spawned) end end def poll_external_task external_task.map do |task| task_resource.poll(task[:task_id]) end end def task_resource pulp_resources.task end def ignored_tags [] end end end end