# frozen_string_literal: true require 'legion/transport/messages/task_check_subtask' require 'legion/transport/messages/task_update' module Legion module Runner # Base running class that will run everything class Runner def initialize(klass, method, options = {}) # rubocop:disable Metrics/AbcSize klass = Kernel.const_get(klass) if klass.is_a? String result = if options[:args].nil? klass.send(method) else klass.send(method, options[:args]) end update_status(options[:task_id]) unless options[:task_id].nil? Legion::Transport::Messages::TaskCheckSubtask.new(klass, method, result, options).publish rescue StandardError => ex Legion::Logging.error(ex.message) Legion::Logging.warn(ex.backtrace) return if options[:task_id].nil? update_status(options[:task_id], 'task.exception', msg: ex.message) end def update_status(_task_id, status = 'task.completed', options = {}) Legion::Transport::Messages::TaskUpdate.new(options[:task_id], status, options).publish # if Legion::Settings[:data][:connected] # update_status_database(task_id, status) # else # update_status_rmq(task_id, status) # end end def update_status_rmq(_task_id, status = 'task.completed', options = {}) Legion::Transport::Messages::TaskUpdate.new(options[:task_id], status).publish end def update_status_database(task_id, _status = 'task.completed', _options = {}) require 'legion/data/models/task' task = Legion::Data::Model::Task[task_id] task.update(status: 'task.completed') end end end end # Status # task.scheduled # condition.queued # condition.failed # condition.succeeded # condition.exception # transformation.queued # transformation.succeeded # transformation.exception # task.completed