lib/isomorfeus/operation/run_task.rb in isomorfeus-operation-2.5.5 vs lib/isomorfeus/operation/run_task.rb in isomorfeus-operation-22.9.0.rc1

- old
+ new

@@ -7,33 +7,45 @@ @timer_task = timer_task @interval = interval end def run + Isomorfeus.init_store + Isomorfeus.store.clear! @rtime = Time.now tasks = get_tasks tasks.each do |task| marked = mark_as_running(task) if marked begin - operation_class = task.operation_class_name.constantize - user_class_name = task.user_class_name - Thread.current[:isomorfeus_user] = if user_class_name == 'LocalSystem' - LocalSystem.new - elsif Isomorfeus.valid_user_class_name?(user_class_name) - user_class = user_class_name.constantize - user_class.load(key: task.user_key) || Anonymous.new - else - Anonymous.new - end - raise 'Access denied!' unless Thread.current[:isomorfeus_user].authorized?(operation_class, :promise_run, task.props) + operation_class = task[:operation_class_name].constantize + user_class_name = task[:user_class_name] + user_instance = if user_class_name == 'LocalSystem' + LocalSystem.new + elsif user_class_name == 'Anonymous' + Anonymous.new + elsif Isomorfeus.valid_user_class_name?(user_class_name) + user_class = user_class_name.constantize + cu = Thread.current[:isomorfeus_user] + Thread.current[:isomorfeus_user] = LocalSystem.new + begin + u = user_class.load(key: task[:user_key]) || Anonymous.new + ensure + Thread.current[:isomorfeus_user] = cu + end + u + else + Anonymous.new + end + Thread.current[:isomorfeus_user] = user_instance + raise 'Access denied!' unless Thread.current[:isomorfeus_user].authorized?(operation_class, :promise_run, task[:props]) if @recurring - operation_class.promise_run(**task.props) + operation_class.promise_run(**task[:props]) .then { mark_as_ready(task) } - .fail { |e| task.fail ? mark_as_failed(task, e) : save_exception(task, e) } + .fail { |e| task[:fail] ? mark_as_failed(task, e) : save_exception(task, e) } else - operation_class.promise_run(**task.props) + operation_class.promise_run(**task[:props]) .then { remove_task(task) } .fail { |e| mark_as_failed(task, e) } end rescue => e mark_as_failed(task, e) @@ -45,49 +57,86 @@ cleanup @timer_task.execution_interval = @interval - (Time.now - @rtime) end def get_tasks - @task_class.search(:state, 'ready').sort_by! { |task| task.rtime.to_i } + begin + Thread.current[:isomorfeus_user] = LocalSystem.new + + result = @task_class.search(:ready).sort_by! { |task| task[:rtime] ? Time.parse(task[:rtime]).to_i : 0 } + ensure + Thread.current[:isomorfeus_user] = nil + end + result end def mark_as_running(task) - result = false - task = @task_class.load(key: task.key) - if task && (task.rtime.nil? || (task.rtime - @rtime) >= @timer_task.execution_interval) && task.state == 'ready' - task.state = 'running' - task.rtime = @rtime - task.save - result = true + begin + Thread.current[:isomorfeus_user] = LocalSystem.new + + task = @task_class.load(key: task.key) + trt = task[:rtime] ? Time.parse(task[:rtime]) : nil + result = false + if task && (trt.nil? || (trt - @rtime) >= @timer_task.execution_interval) && task[:state] == 'ready' + task[:state] = 'running' + task[:rtime] = @rtime.to_s + task.save + result = true + end + ensure + Thread.current[:isomorfeus_user] = nil end result end def mark_as_ready(task) - task.state = 'ready' + Thread.current[:isomorfeus_user] = LocalSystem.new + + task[:state] = 'ready' task.save + ensure + Thread.current[:isomorfeus_user] = nil end def mark_as_failed(task, exception) - task.state = 'failed' + Thread.current[:isomorfeus_user] = LocalSystem.new + + task[:state] = 'failed' save_exception(task, exception) + ensure + Thread.current[:isomorfeus_user] = nil end - def save_exception(taks, exception) - task.exception = Marshal.dump(exception) + def save_exception(task, exception) + Thread.current[:isomorfeus_user] = LocalSystem.new + + task[:exception] = Marshal.dump(exception) task.save + ensure + Thread.current[:isomorfeus_user] = nil end def remove_task(task) + Thread.current[:isomorfeus_user] = LocalSystem.new + @task_class.destroy(key: task.key) + ensure + Thread.current[:isomorfeus_user] = nil end def cleanup - running_tasks = @task_class.search(:state, 'running').sort_by! { |task| task.rtime.to_i } + Thread.current[:isomorfeus_user] = LocalSystem.new + + running_tasks = @task_class.search(:running).sort_by! do |task| + task[:rtime] ? Time.parse(task[:rtime]).to_i : 0 + end running_tasks.each do |task| # previous task run has timed out most probably - mark_as_failed(task, RuntimeError.new('Operation execution timed out, giving up.')) if (@rtime - task.rtime) > @interval + trt = task[:rtime] ? Time.parse(task[:rtime]) : 0 + mark_as_failed(task, RuntimeError.new('Operation execution timed out, giving up.')) if (@rtime - trt) > @interval end + ensure + Thread.current[:isomorfeus_user] = nil end end end end