module Isomorfeus module Operation class RunTask def initialize(task_class, timer_task:, interval:, recurring: false) @task_class = task_class @recurring = recurring @timer_task = timer_task @interval = interval end def run @rtime = Time.now tasks = get_tasks tasks.each do |task| marked = mark_as_running(task) if marked begin operation_class = task.operation_class_name.constantize user_class_name = task.user_class_name Thread.current[:isomorfeus_user] = if user_class_name == 'LocalSystem' LocalSystem.new elsif Isomorfeus.valid_user_class_name?(user_class_name) user_class = user_class_name.constantize user_class.load(key: task.user_key) || Anonymous.new else Anonymous.new end raise 'Access denied!' unless Thread.current[:isomorfeus_user].authorized?(operation_class, :promise_run, task.props) if @recurring operation_class.promise_run(**task.props) .then { mark_as_ready(task) } .fail { |e| task.fail ? mark_as_failed(task, e) : save_exception(task, e) } else operation_class.promise_run(**task.props) .then { remove_task(task) } .fail { |e| mark_as_failed(task, e) } end rescue => e mark_as_failed(task, e) ensure Thread.current[:isomorfeus_user] = nil end end end cleanup @timer_task.execution_interval = @interval - (Time.now - @rtime) end def get_tasks @task_class.search(:state, 'ready').sort_by! { |task| task.rtime.to_i } end def mark_as_running(task) result = false @task_class.object_expander.environment.transaction do task = @task_class.load(key: task.key) if task.rtime.nil? || (task.rtime - @rtime) >= @timer_task.execution_interval if task.state == 'ready' task.state = 'running' task.rtime = @rtime task.save result = true end end end result end def mark_as_ready(task) task.state = 'ready' task.save end def mark_as_failed(task, exception) task.state = 'failed' save_exception(task, exception) end def save_exception(taks, exception) task.exception = Marshal.dump(exception) task.save end def remove_task(task) @task_class.destroy(key: task.key) end def cleanup running_tasks = @task_class.search(:state, 'running').sort_by! { |task| task.rtime.to_i } running_tasks.each do |task| # previous task run has timed out most probably mark_as_failed(task, RuntimeError.new('Operation execution timed out, giving up.')) if (@rtime - task.rtime) > @interval end end end end end