lib/legion/extensions/tasker/runners/fetch_delayed.rb in lex-tasker-0.1.3 vs lib/legion/extensions/tasker/runners/fetch_delayed.rb in lex-tasker-0.2.0
- old
+ new
@@ -1,50 +1,71 @@
module Legion::Extensions::Tasker::Runners
module FetchDelayed
- include Legion::Extensions::Helpers::Lex
+ extend Legion::Extensions::Tasker::Helpers::FetchDelayed
+ include Legion::Extensions::Helpers::Task
def fetch(**_opts)
- tasks = Legion::Data::Model::Task.where(status: 'task.delayed')
- tasks_pushed = []
- log.debug "tasks.count = #{tasks.count}"
- tasks.each do |task|
- relationship = task.relationship
- next if !task.relationship.nil? && Time.now < task.values[:created] + relationship.values[:delay]
+ find_delayed.each do |task|
+ if task[:relationship_delay].is_a?(Integer) && task[:relationship_delay].positive?
+ next if Time.now < task[:created] + task[:relationship_delay] # rubocop:disable Style/SoleNestedConditional
+ end
- # next if Time.now < task.values[:created] + task.values[:delay]
+ if task[:task_delay].is_a?(Integer) && task[:task_delay].positive?
+ next if Time.now < task[:created] + task[:task_delay] # rubocop:disable Style/SoleNestedConditional
+ end
- subtask = Legion::Transport::Messages::SubTask.new(
- relationship_id: relationship.values[:id],
- chain_id: relationship.values[:chain_id],
- trigger_runner_id: relationship.trigger.runner.values[:id],
- trigger_function_id: relationship.values[:trigger_id],
- function_id: relationship.action.values[:id],
- function: relationship.action.values[:name],
- runner_id: relationship.action.values[:runner_id],
- runner_class: relationship.action.runner.values[:namespace],
- conditions: relationship.values[:conditions],
- transformation: relationship.values[:transformation],
- # debug: relationship.values[:debug],
- task_id: task.values[:id]
- # results: task.values[:payload]
- )
- subtask.publish
- task.update(status: 'conditioner.queued')
- tasks_pushed.push(task.values[:id])
- rescue StandardError => e
- task.update(status: 'task.push_exception')
- log.error e.message
- log.error e.backtrace
+ subtask_hash = {
+ relationship_id: task[:relationship_id],
+ chain_id: task[:chain_id],
+ function_id: task[:function_id],
+ function: task[:function_name],
+ runner_id: task[:runner_id],
+ runner_class: task[:runner_class],
+ task_id: task[:id],
+ exchange: task[:exchange],
+ queue: task[:queue]
+ }
+
+ subtask_hash[:conditions] = task[:conditions] if task[:conditions].is_a?(String)
+ subtask_hash[:transformation] = task[:transformation] if task[:transformation].is_a?(String)
+
+ subtask_hash[:routing_key] = if task[:conditions].is_a?(String) && task[:conditions].length > 4
+ 'task.subtask.conditioner'
+ elsif task[:transformation].is_a?(String) && task[:transformation].length > 4
+ 'task.subtask.transformation'
+ else
+ task[:runner_routing_key]
+ end
+
+ send_task(**subtask_hash)
+ case subtask_hash[:routing_key]
+ when 'task.subtask.conditioner'
+ task_update(task[:id], 'conditioner.queued')
+ when 'task.subtask.transformation'
+ task_update(task[:id], 'transformer.queued')
+ else
+ task_update(task[:id], 'task.queued')
+ end
end
+ end
- { success: true, count: tasks_pushed.count, tasks: tasks_pushed }
- rescue StandardError => e
- Legion::Logging.error e.message
- Legion::Logging.error e.backtrace
+ def send_task(**opts)
+ opts[:results] = opts[:result] if opts.key?(:result) && !opts.key?(:results)
+ opts[:success] = if opts.key?(:result) && opts.key?(:success)
+ opts[:result][:success]
+ elsif opts.key?(:success)
+ opts[:success]
+ else
+ 1
+ end
+ log.debug 'pushing delayed task to worker'
+ Legion::Transport::Messages::Task.new(**opts).publish
end
def push(**_opts)
Legion::Extensions::Tasker::Transport::Messages::FetchDelayed.new.publish
{ success: true }
end
+
+ include Legion::Extensions::Helpers::Lex
end
end