lib/legion/extensions/conditioner/runners/conditioner.rb in lex-conditioner-0.2.1 vs lib/legion/extensions/conditioner/runners/conditioner.rb in lex-conditioner-0.2.3

- old
+ new

@@ -1,22 +1,26 @@ require 'legion/extensions/conditioner/helpers/condition' module Legion::Extensions::Conditioner module Runners module Conditioner - def self.check(**payload) # rubocop:disable Metrics/AbcSize - conditioner = Legion::Extensions::Conditioner::Condition.new(conditions: payload[:conditions], - task_id: payload[:task_id], + def check(conditions:, **payload) # rubocop:disable Metrics/AbcSize + conditioner = Legion::Extensions::Conditioner::Condition.new(conditions: conditions, values: payload, type: payload[:type]) - if conditioner.valid? - Legion::Extensions::Conditioner::Transport::Messages::Conditioner.new(**payload).publish - status = 'task.queued' - else - status = 'conditioner.failed' - end + status = if conditioner.valid? && payload.key?(:transformation) + 'transformation.queued' + elsif conditioner.valid? && payload.key?(:runner_routing_key) + 'task.queued' + elsif conditioner.valid? + 'task.exception' + else + 'conditioner.failed' + end + + send_task(**payload) unless status == 'conditioner.failed' task_update(payload[:task_id], status, **payload) unless payload[:task_id].nil? if payload[:debug] && payload.key?(:task_id) generate_task_log(task_id: payload[:task_id], function: 'check', @@ -28,12 +32,27 @@ { success: true, valid: conditioner.valid? } rescue StandardError => e Legion::Logging.error 'LEX::Conditioner::Runners::Condition had an exception' Legion::Logging.warn e.message Legion::Logging.warn e.backtrace - unless payload[:task_id].nil? - Legion::Transport::Messages::TaskUpdate.new(task_id: payload[:task_id], status: 'conditioner.failed').publish + task_update(payload[:task_id], 'conditioner.exception', **payload) unless payload[:task_id].nil? + end + + def send_task(**opts) + subtask_hash = {} + %i[runner_routing_key relationship_id chain_id trigger_runner_id trigger_function_id funciton_id function runner_id runner_class transformation debug task_id results].each do |column| # rubocop:disable Layout/LineLength + subtask_hash[column] = opts[column] if opts.key? column end + + subtask_hash[:routing_key] = if subtask_hash.key? :transformation + 'task.subtask.transform' + elsif subtask_hash.key? :runner_routing_key + subtask_hash[:runner_routing_key] + end + + raise Legion::Exception::MissingArgument 'Missing :routing_key' unless subtask_hash.key? :routing_key + + Legion::Transport::Messages::SubTask.new(**subtask_hash).publish end include Legion::Extensions::Helpers::Lex include Legion::Extensions::Helpers::Task end