lib/legion/extensions/tasker/runners/fetch_delayed.rb in lex-tasker-0.1.3 vs lib/legion/extensions/tasker/runners/fetch_delayed.rb in lex-tasker-0.2.0

- old
+ new

@@ -1,50 +1,71 @@ module Legion::Extensions::Tasker::Runners module FetchDelayed - include Legion::Extensions::Helpers::Lex + extend Legion::Extensions::Tasker::Helpers::FetchDelayed + include Legion::Extensions::Helpers::Task def fetch(**_opts) - tasks = Legion::Data::Model::Task.where(status: 'task.delayed') - tasks_pushed = [] - log.debug "tasks.count = #{tasks.count}" - tasks.each do |task| - relationship = task.relationship - next if !task.relationship.nil? && Time.now < task.values[:created] + relationship.values[:delay] + find_delayed.each do |task| + if task[:relationship_delay].is_a?(Integer) && task[:relationship_delay].positive? + next if Time.now < task[:created] + task[:relationship_delay] # rubocop:disable Style/SoleNestedConditional + end - # next if Time.now < task.values[:created] + task.values[:delay] + if task[:task_delay].is_a?(Integer) && task[:task_delay].positive? + next if Time.now < task[:created] + task[:task_delay] # rubocop:disable Style/SoleNestedConditional + end - subtask = Legion::Transport::Messages::SubTask.new( - relationship_id: relationship.values[:id], - chain_id: relationship.values[:chain_id], - trigger_runner_id: relationship.trigger.runner.values[:id], - trigger_function_id: relationship.values[:trigger_id], - function_id: relationship.action.values[:id], - function: relationship.action.values[:name], - runner_id: relationship.action.values[:runner_id], - runner_class: relationship.action.runner.values[:namespace], - conditions: relationship.values[:conditions], - transformation: relationship.values[:transformation], - # debug: relationship.values[:debug], - task_id: task.values[:id] - # results: task.values[:payload] - ) - subtask.publish - task.update(status: 'conditioner.queued') - tasks_pushed.push(task.values[:id]) - rescue StandardError => e - task.update(status: 'task.push_exception') - log.error e.message - log.error e.backtrace + subtask_hash = { + relationship_id: task[:relationship_id], + chain_id: task[:chain_id], + function_id: task[:function_id], + function: task[:function_name], + runner_id: task[:runner_id], + runner_class: task[:runner_class], + task_id: task[:id], + exchange: task[:exchange], + queue: task[:queue] + } + + subtask_hash[:conditions] = task[:conditions] if task[:conditions].is_a?(String) + subtask_hash[:transformation] = task[:transformation] if task[:transformation].is_a?(String) + + subtask_hash[:routing_key] = if task[:conditions].is_a?(String) && task[:conditions].length > 4 + 'task.subtask.conditioner' + elsif task[:transformation].is_a?(String) && task[:transformation].length > 4 + 'task.subtask.transformation' + else + task[:runner_routing_key] + end + + send_task(**subtask_hash) + case subtask_hash[:routing_key] + when 'task.subtask.conditioner' + task_update(task[:id], 'conditioner.queued') + when 'task.subtask.transformation' + task_update(task[:id], 'transformer.queued') + else + task_update(task[:id], 'task.queued') + end end + end - { success: true, count: tasks_pushed.count, tasks: tasks_pushed } - rescue StandardError => e - Legion::Logging.error e.message - Legion::Logging.error e.backtrace + def send_task(**opts) + opts[:results] = opts[:result] if opts.key?(:result) && !opts.key?(:results) + opts[:success] = if opts.key?(:result) && opts.key?(:success) + opts[:result][:success] + elsif opts.key?(:success) + opts[:success] + else + 1 + end + log.debug 'pushing delayed task to worker' + Legion::Transport::Messages::Task.new(**opts).publish end def push(**_opts) Legion::Extensions::Tasker::Transport::Messages::FetchDelayed.new.publish { success: true } end + + include Legion::Extensions::Helpers::Lex end end