lib/legion/extensions/tasker/runners/check_subtask.rb in lex-tasker-0.1.3 vs lib/legion/extensions/tasker/runners/check_subtask.rb in lex-tasker-0.2.0
- old
+ new
@@ -2,117 +2,96 @@
module Legion::Extensions::Tasker
module Runners
module CheckSubtask
include Legion::Extensions::Helpers::Lex
+ extend Legion::Extensions::Tasker::Helpers::FindSubtask
def check_subtasks(runner_class:, function:, **opts)
- runner_record = Legion::Data::Model::Runner[namespace: runner_class]
- return if runner_record.nil?
+ trigger = find_trigger(runner_class: runner_class, function: function)
- function_record = runner_record.functions_dataset[name: function]
- return if function_record.nil?
-
- relationships = function_record.trigger_relationships_dataset.where(:active)
- relationships.where(chain_id: opts[:chain_id] || :allow_new_chains) if opts.key? :chain_id
- return { success: true, count: relationships.count } if relationships.count.zero?
-
- relationships.each do |relationship|
- unless relationship.values[:allow_new_chains]
- next if relationship.chain.nil?
+ find_subtasks(trigger_id: trigger[:function_id]).each do |relationship|
+ unless relationship[:allow_new_chains]
+ next if relationship[:chain_id].nil?
next unless opts.key? :chain_id
- next unless relationship.values[:chain_id] == opts[:chain_id]
+ next unless relationship[:chain_id] == opts[:chain_id]
end
- action_function = relationship.action
- action_runner = action_function.runner
+ task_hash = relationship
+ task_hash[:status] = relationship[:delay].zero? ? 'conditioner.queued' : 'task.delayed'
+ task_hash[:payload] = opts
- status = relationship.values[:delay].zero? ? 'conditioner.queued' : 'task.delayed'
-
- task_id_hash = { runner_class: action_runner.values[:namespace],
- function: action_function.values[:name],
- status: status,
- relationship_id: relationship.values[:id] }
- task_id_hash[:payload] = opts
-
if opts.key? :master_id
- task_id_hash[:master_id] = opts[:master_id]
+ task_hash[:master_id] = opts[:master_id]
elsif opts.key? :parent_id
- task_id_hash[:master_id] = opts[:parent_id]
+ task_hash[:master_id] = opts[:parent_id]
elsif opts.key? :task_id
- task_id_hash[:master_id] = opts[:task_id]
+ task_hash[:master_id] = opts[:task_id]
end
- task_id_hash[:parent_id] = opts[:task_id] if opts.key? :task_id
+ task_hash[:parent_id] = opts[:task_id] if opts.key? :task_id
+ task_hash[:routing_key] = if relationship[:conditions].is_a?(String) && relationship[:conditions].length > 4
+ 'task.subtask.conditioner'
+ elsif relationship[:transformation].is_a?(String) && relationship[:transformation].length > 4 # rubocop:disable Layout/LineLength
+ 'task.subtask.transformation'
+ else
+ relationship[:runner_routing_key]
+ end
+
if opts[:result].is_a? Array
opts[:result].each do |result|
- send_task(task_id_hash,
- relationship: relationship,
- runner_record: runner_record,
- function_record: function_record,
- action_function: action_function,
- action_runner: action_runner,
- result: result)
+ send_task(results: result,
+ trigger_runner_id: trigger[:runner_id],
+ trigger_function_id: trigger[:function_id],
+ **task_hash)
end
else
- send_task(task_id_hash,
- relationship: relationship,
- runner_record: runner_record,
- function_record: function_record,
- action_function: action_function,
- action_runner: action_runner,
- **opts)
+ results = if opts[:results].is_a? Hash
+ opts[:results]
+ elsif opts[:result].is_a? Hash
+ opts[:result]
+ else
+ opts
+ end
+ send_task(
+ results: results,
+ trigger_runner_id: trigger[:runner_id],
+ trigger_function_id: trigger[:function_id],
+ **task_hash
+ )
end
end
- rescue StandardError => e
- Legion::Logging.fatal e.message
- Legion::Logging.fatal e.backtrace
- Legion::Logging.fatal runner_class
- Legion::Logging.fatal function
- Legion::Logging.fatal opts.keys
- Legion::Logging.fatal opts[:entry]
end
- def send_task(task_id_hash, relationship:, runner_record:, function_record:, action_function:, action_runner:, **opts) # rubocop:disable Layout/LineLength
- task_id = Legion::Runner::Status.generate_task_id(**task_id_hash)[:task_id]
+ def send_task(**opts)
+ opts[:results] = opts[:result] if opts.key?(:result) && !opts.key?(:results)
+ opts[:success] = if opts.key?(:result) && opts.key?(:success)
+ opts[:result][:success]
+ elsif opts.key?(:success)
+ opts[:success]
+ else
+ 1
+ end
- return { status: true } unless relationship.values[:delay].zero?
+ # opts[:task_id] = Legion::Runner::Status.generate_task_id(**opts)[:task_id]
+ opts[:task_id] = insert_task(**opts)
+ return { status: true } unless opts[:delay].zero?
- subtask_hash = {
- relationship_id: relationship.values[:id],
- chain_id: relationship.values[:chain_id],
- trigger_runner_id: runner_record.values[:id],
- trigger_function_id: function_record.values[:id],
- function_id: action_function.values[:id],
- function: action_function.values[:name],
- runner_id: action_runner.values[:id],
- runner_class: action_runner.values[:namespace],
- conditions: relationship.values[:conditions],
- transformation: relationship.values[:transformation],
- debug: relationship.values[:debug] && 1 || 0,
- task_id: task_id,
- results: opts[:result]
- }
+ Legion::Transport::Messages::SubTask.new(**opts).publish
+ end
- subtask_hash[:routing_key] = if subtask_hash[:conditions].is_a?(String) && subtask_hash[:conditioners].length > 4 # rubocop:disable Layout/LineLength
- 'task.subtask.conditioner'
- elsif subtask_hash[:transformation].is_a?(String) && subtask_hash[:transformation].length > 4 # rubocop:disable Layout/LineLength
- 'task.subtask.transform'
- else
- "#{runner_record.extension.values[:exchange]}.#{runner_record.values[:queue]}.#{subtask_hash[:function]}" # rubocop:disable Layout/LineLength
- end
-
- subtask_hash[:success] = if opts.nil?
- 1
- elsif opts.key?(:result)
- # opts[:result][:success]
- 1
- elsif opts.key?(:success)
- opts[:success]
- else
- 1
- end
- Legion::Transport::Messages::SubTask.new(**subtask_hash).publish
+ def insert_task(relationship_id:, function_id:, status: 'task.queued', master_id: nil, parent_id: nil, **opts)
+ insert_hash = { relationship_id: relationship_id, function_id: function_id, status: status }
+ insert_hash[:master_id] = if master_id.is_a? Integer
+ master_id
+ elsif parent_id.is_a? Integer
+ parent_id
+ end
+ insert_hash[:parent_id] = parent_id if parent_id.is_a? Integer
+ insert_hash[:payload] = Legion::JSON.dump(opts)
+ # insert_hash[:function_args] = nil
+ # insert_hash[:results] = nil
+ Legion::Data::Model::Task.insert(insert_hash)
end
end
end
end