module Isomorfeus module Operation class RunTask def initialize(task_class, timer_task:, interval:, recurring: false) @task_class = task_class @recurring = recurring @timer_task = timer_task @interval = interval end def run Isomorfeus.init_store Isomorfeus.store.clear! @rtime = Time.now tasks = get_tasks tasks.each do |task| marked = mark_as_running(task) if marked begin operation_class = task[:operation_class_name].constantize user_class_name = task[:user_class_name] user_instance = if user_class_name == 'LocalSystem' LocalSystem.new elsif user_class_name == 'Anonymous' Anonymous.new elsif Isomorfeus.valid_user_class_name?(user_class_name) user_class = user_class_name.constantize cu = Thread.current[:isomorfeus_user] Thread.current[:isomorfeus_user] = LocalSystem.new begin u = user_class.load(key: task[:user_key]) || Anonymous.new ensure Thread.current[:isomorfeus_user] = cu end u else Anonymous.new end Thread.current[:isomorfeus_user] = user_instance props = task[:props].transform_keys(&:to_sym) raise 'Access denied!' unless Thread.current[:isomorfeus_user].authorized?(operation_class, :promise_run, props) if @recurring operation_class.promise_run(**props) .then { mark_as_ready(task) } .fail { |e| task[:fail] ? mark_as_failed(task, e) : save_exception(task, e) } else operation_class.promise_run(**props) .then { remove_task(task) } .fail { |e| mark_as_failed(task, e) } end rescue => e mark_as_failed(task, e) ensure Thread.current[:isomorfeus_user] = nil end end end cleanup @timer_task.execution_interval = @interval - (Time.now - @rtime) end def get_tasks begin Thread.current[:isomorfeus_user] = LocalSystem.new result = @task_class.search(:ready).sort_by! { |task| task[:rtime] ? Time.parse(task[:rtime]).to_i : 0 } ensure Thread.current[:isomorfeus_user] = nil end result end def mark_as_running(task) begin Thread.current[:isomorfeus_user] = LocalSystem.new task = @task_class.load(key: task.key) trt = task[:rtime] ? Time.parse(task[:rtime]) : nil result = false if task && (trt.nil? || (trt - @rtime) >= @timer_task.execution_interval) && task[:state] == 'ready' task[:state] = 'running' task[:rtime] = @rtime.to_s task.save result = true end ensure Thread.current[:isomorfeus_user] = nil end result end def mark_as_ready(task) Thread.current[:isomorfeus_user] = LocalSystem.new task[:state] = 'ready' task.save ensure Thread.current[:isomorfeus_user] = nil end def mark_as_failed(task, exception) Thread.current[:isomorfeus_user] = LocalSystem.new task[:state] = 'failed' save_exception(task, exception) ensure Thread.current[:isomorfeus_user] = nil end def save_exception(task, exception) Thread.current[:isomorfeus_user] = LocalSystem.new task[:exception] = Marshal.dump(exception) task.save ensure Thread.current[:isomorfeus_user] = nil end def remove_task(task) Thread.current[:isomorfeus_user] = LocalSystem.new @task_class.destroy(key: task.key) ensure Thread.current[:isomorfeus_user] = nil end def cleanup Thread.current[:isomorfeus_user] = LocalSystem.new running_tasks = @task_class.search(:running).sort_by! do |task| task[:rtime] ? Time.parse(task[:rtime]).to_i : 0 end running_tasks.each do |task| # previous task run has timed out most probably trt = task[:rtime] ? Time.parse(task[:rtime]) : 0 mark_as_failed(task, RuntimeError.new('Operation execution timed out, giving up.')) if (@rtime - trt) > @interval end ensure Thread.current[:isomorfeus_user] = nil end end end end