lib/legion/extensions/conditioner/runners/conditioner.rb in lex-conditioner-0.2.1 vs lib/legion/extensions/conditioner/runners/conditioner.rb in lex-conditioner-0.2.3
- old
+ new
@@ -1,22 +1,26 @@
require 'legion/extensions/conditioner/helpers/condition'
module Legion::Extensions::Conditioner
module Runners
module Conditioner
- def self.check(**payload) # rubocop:disable Metrics/AbcSize
- conditioner = Legion::Extensions::Conditioner::Condition.new(conditions: payload[:conditions],
- task_id: payload[:task_id],
+ def check(conditions:, **payload) # rubocop:disable Metrics/AbcSize
+ conditioner = Legion::Extensions::Conditioner::Condition.new(conditions: conditions,
values: payload,
type: payload[:type])
- if conditioner.valid?
- Legion::Extensions::Conditioner::Transport::Messages::Conditioner.new(**payload).publish
- status = 'task.queued'
- else
- status = 'conditioner.failed'
- end
+ status = if conditioner.valid? && payload.key?(:transformation)
+ 'transformation.queued'
+ elsif conditioner.valid? && payload.key?(:runner_routing_key)
+ 'task.queued'
+ elsif conditioner.valid?
+ 'task.exception'
+ else
+ 'conditioner.failed'
+ end
+
+ send_task(**payload) unless status == 'conditioner.failed'
task_update(payload[:task_id], status, **payload) unless payload[:task_id].nil?
if payload[:debug] && payload.key?(:task_id)
generate_task_log(task_id: payload[:task_id],
function: 'check',
@@ -28,12 +32,27 @@
{ success: true, valid: conditioner.valid? }
rescue StandardError => e
Legion::Logging.error 'LEX::Conditioner::Runners::Condition had an exception'
Legion::Logging.warn e.message
Legion::Logging.warn e.backtrace
- unless payload[:task_id].nil?
- Legion::Transport::Messages::TaskUpdate.new(task_id: payload[:task_id], status: 'conditioner.failed').publish
+ task_update(payload[:task_id], 'conditioner.exception', **payload) unless payload[:task_id].nil?
+ end
+
+ def send_task(**opts)
+ subtask_hash = {}
+ %i[runner_routing_key relationship_id chain_id trigger_runner_id trigger_function_id funciton_id function runner_id runner_class transformation debug task_id results].each do |column| # rubocop:disable Layout/LineLength
+ subtask_hash[column] = opts[column] if opts.key? column
end
+
+ subtask_hash[:routing_key] = if subtask_hash.key? :transformation
+ 'task.subtask.transform'
+ elsif subtask_hash.key? :runner_routing_key
+ subtask_hash[:runner_routing_key]
+ end
+
+ raise Legion::Exception::MissingArgument 'Missing :routing_key' unless subtask_hash.key? :routing_key
+
+ Legion::Transport::Messages::SubTask.new(**subtask_hash).publish
end
include Legion::Extensions::Helpers::Lex
include Legion::Extensions::Helpers::Task
end