lib/legion/extensions/tasker/runners/check_subtask.rb in lex-tasker-0.1.3 vs lib/legion/extensions/tasker/runners/check_subtask.rb in lex-tasker-0.2.0

- old
+ new

@@ -2,117 +2,96 @@ module Legion::Extensions::Tasker module Runners module CheckSubtask include Legion::Extensions::Helpers::Lex + extend Legion::Extensions::Tasker::Helpers::FindSubtask def check_subtasks(runner_class:, function:, **opts) - runner_record = Legion::Data::Model::Runner[namespace: runner_class] - return if runner_record.nil? + trigger = find_trigger(runner_class: runner_class, function: function) - function_record = runner_record.functions_dataset[name: function] - return if function_record.nil? - - relationships = function_record.trigger_relationships_dataset.where(:active) - relationships.where(chain_id: opts[:chain_id] || :allow_new_chains) if opts.key? :chain_id - return { success: true, count: relationships.count } if relationships.count.zero? - - relationships.each do |relationship| - unless relationship.values[:allow_new_chains] - next if relationship.chain.nil? + find_subtasks(trigger_id: trigger[:function_id]).each do |relationship| + unless relationship[:allow_new_chains] + next if relationship[:chain_id].nil? next unless opts.key? :chain_id - next unless relationship.values[:chain_id] == opts[:chain_id] + next unless relationship[:chain_id] == opts[:chain_id] end - action_function = relationship.action - action_runner = action_function.runner + task_hash = relationship + task_hash[:status] = relationship[:delay].zero? ? 'conditioner.queued' : 'task.delayed' + task_hash[:payload] = opts - status = relationship.values[:delay].zero? ? 'conditioner.queued' : 'task.delayed' - - task_id_hash = { runner_class: action_runner.values[:namespace], - function: action_function.values[:name], - status: status, - relationship_id: relationship.values[:id] } - task_id_hash[:payload] = opts - if opts.key? :master_id - task_id_hash[:master_id] = opts[:master_id] + task_hash[:master_id] = opts[:master_id] elsif opts.key? :parent_id - task_id_hash[:master_id] = opts[:parent_id] + task_hash[:master_id] = opts[:parent_id] elsif opts.key? :task_id - task_id_hash[:master_id] = opts[:task_id] + task_hash[:master_id] = opts[:task_id] end - task_id_hash[:parent_id] = opts[:task_id] if opts.key? :task_id + task_hash[:parent_id] = opts[:task_id] if opts.key? :task_id + task_hash[:routing_key] = if relationship[:conditions].is_a?(String) && relationship[:conditions].length > 4 + 'task.subtask.conditioner' + elsif relationship[:transformation].is_a?(String) && relationship[:transformation].length > 4 # rubocop:disable Layout/LineLength + 'task.subtask.transformation' + else + relationship[:runner_routing_key] + end + if opts[:result].is_a? Array opts[:result].each do |result| - send_task(task_id_hash, - relationship: relationship, - runner_record: runner_record, - function_record: function_record, - action_function: action_function, - action_runner: action_runner, - result: result) + send_task(results: result, + trigger_runner_id: trigger[:runner_id], + trigger_function_id: trigger[:function_id], + **task_hash) end else - send_task(task_id_hash, - relationship: relationship, - runner_record: runner_record, - function_record: function_record, - action_function: action_function, - action_runner: action_runner, - **opts) + results = if opts[:results].is_a? Hash + opts[:results] + elsif opts[:result].is_a? Hash + opts[:result] + else + opts + end + send_task( + results: results, + trigger_runner_id: trigger[:runner_id], + trigger_function_id: trigger[:function_id], + **task_hash + ) end end - rescue StandardError => e - Legion::Logging.fatal e.message - Legion::Logging.fatal e.backtrace - Legion::Logging.fatal runner_class - Legion::Logging.fatal function - Legion::Logging.fatal opts.keys - Legion::Logging.fatal opts[:entry] end - def send_task(task_id_hash, relationship:, runner_record:, function_record:, action_function:, action_runner:, **opts) # rubocop:disable Layout/LineLength - task_id = Legion::Runner::Status.generate_task_id(**task_id_hash)[:task_id] + def send_task(**opts) + opts[:results] = opts[:result] if opts.key?(:result) && !opts.key?(:results) + opts[:success] = if opts.key?(:result) && opts.key?(:success) + opts[:result][:success] + elsif opts.key?(:success) + opts[:success] + else + 1 + end - return { status: true } unless relationship.values[:delay].zero? + # opts[:task_id] = Legion::Runner::Status.generate_task_id(**opts)[:task_id] + opts[:task_id] = insert_task(**opts) + return { status: true } unless opts[:delay].zero? - subtask_hash = { - relationship_id: relationship.values[:id], - chain_id: relationship.values[:chain_id], - trigger_runner_id: runner_record.values[:id], - trigger_function_id: function_record.values[:id], - function_id: action_function.values[:id], - function: action_function.values[:name], - runner_id: action_runner.values[:id], - runner_class: action_runner.values[:namespace], - conditions: relationship.values[:conditions], - transformation: relationship.values[:transformation], - debug: relationship.values[:debug] && 1 || 0, - task_id: task_id, - results: opts[:result] - } + Legion::Transport::Messages::SubTask.new(**opts).publish + end - subtask_hash[:routing_key] = if subtask_hash[:conditions].is_a?(String) && subtask_hash[:conditioners].length > 4 # rubocop:disable Layout/LineLength - 'task.subtask.conditioner' - elsif subtask_hash[:transformation].is_a?(String) && subtask_hash[:transformation].length > 4 # rubocop:disable Layout/LineLength - 'task.subtask.transform' - else - "#{runner_record.extension.values[:exchange]}.#{runner_record.values[:queue]}.#{subtask_hash[:function]}" # rubocop:disable Layout/LineLength - end - - subtask_hash[:success] = if opts.nil? - 1 - elsif opts.key?(:result) - # opts[:result][:success] - 1 - elsif opts.key?(:success) - opts[:success] - else - 1 - end - Legion::Transport::Messages::SubTask.new(**subtask_hash).publish + def insert_task(relationship_id:, function_id:, status: 'task.queued', master_id: nil, parent_id: nil, **opts) + insert_hash = { relationship_id: relationship_id, function_id: function_id, status: status } + insert_hash[:master_id] = if master_id.is_a? Integer + master_id + elsif parent_id.is_a? Integer + parent_id + end + insert_hash[:parent_id] = parent_id if parent_id.is_a? Integer + insert_hash[:payload] = Legion::JSON.dump(opts) + # insert_hash[:function_args] = nil + # insert_hash[:results] = nil + Legion::Data::Model::Task.insert(insert_hash) end end end end