lib/isomorfeus/operation/run_task.rb in isomorfeus-operation-2.5.5 vs lib/isomorfeus/operation/run_task.rb in isomorfeus-operation-22.9.0.rc1
- old
+ new
@@ -7,33 +7,45 @@
@timer_task = timer_task
@interval = interval
end
def run
+ Isomorfeus.init_store
+ Isomorfeus.store.clear!
@rtime = Time.now
tasks = get_tasks
tasks.each do |task|
marked = mark_as_running(task)
if marked
begin
- operation_class = task.operation_class_name.constantize
- user_class_name = task.user_class_name
- Thread.current[:isomorfeus_user] = if user_class_name == 'LocalSystem'
- LocalSystem.new
- elsif Isomorfeus.valid_user_class_name?(user_class_name)
- user_class = user_class_name.constantize
- user_class.load(key: task.user_key) || Anonymous.new
- else
- Anonymous.new
- end
- raise 'Access denied!' unless Thread.current[:isomorfeus_user].authorized?(operation_class, :promise_run, task.props)
+ operation_class = task[:operation_class_name].constantize
+ user_class_name = task[:user_class_name]
+ user_instance = if user_class_name == 'LocalSystem'
+ LocalSystem.new
+ elsif user_class_name == 'Anonymous'
+ Anonymous.new
+ elsif Isomorfeus.valid_user_class_name?(user_class_name)
+ user_class = user_class_name.constantize
+ cu = Thread.current[:isomorfeus_user]
+ Thread.current[:isomorfeus_user] = LocalSystem.new
+ begin
+ u = user_class.load(key: task[:user_key]) || Anonymous.new
+ ensure
+ Thread.current[:isomorfeus_user] = cu
+ end
+ u
+ else
+ Anonymous.new
+ end
+ Thread.current[:isomorfeus_user] = user_instance
+ raise 'Access denied!' unless Thread.current[:isomorfeus_user].authorized?(operation_class, :promise_run, task[:props])
if @recurring
- operation_class.promise_run(**task.props)
+ operation_class.promise_run(**task[:props])
.then { mark_as_ready(task) }
- .fail { |e| task.fail ? mark_as_failed(task, e) : save_exception(task, e) }
+ .fail { |e| task[:fail] ? mark_as_failed(task, e) : save_exception(task, e) }
else
- operation_class.promise_run(**task.props)
+ operation_class.promise_run(**task[:props])
.then { remove_task(task) }
.fail { |e| mark_as_failed(task, e) }
end
rescue => e
mark_as_failed(task, e)
@@ -45,49 +57,86 @@
cleanup
@timer_task.execution_interval = @interval - (Time.now - @rtime)
end
def get_tasks
- @task_class.search(:state, 'ready').sort_by! { |task| task.rtime.to_i }
+ begin
+ Thread.current[:isomorfeus_user] = LocalSystem.new
+
+ result = @task_class.search(:ready).sort_by! { |task| task[:rtime] ? Time.parse(task[:rtime]).to_i : 0 }
+ ensure
+ Thread.current[:isomorfeus_user] = nil
+ end
+ result
end
def mark_as_running(task)
- result = false
- task = @task_class.load(key: task.key)
- if task && (task.rtime.nil? || (task.rtime - @rtime) >= @timer_task.execution_interval) && task.state == 'ready'
- task.state = 'running'
- task.rtime = @rtime
- task.save
- result = true
+ begin
+ Thread.current[:isomorfeus_user] = LocalSystem.new
+
+ task = @task_class.load(key: task.key)
+ trt = task[:rtime] ? Time.parse(task[:rtime]) : nil
+ result = false
+ if task && (trt.nil? || (trt - @rtime) >= @timer_task.execution_interval) && task[:state] == 'ready'
+ task[:state] = 'running'
+ task[:rtime] = @rtime.to_s
+ task.save
+ result = true
+ end
+ ensure
+ Thread.current[:isomorfeus_user] = nil
end
result
end
def mark_as_ready(task)
- task.state = 'ready'
+ Thread.current[:isomorfeus_user] = LocalSystem.new
+
+ task[:state] = 'ready'
task.save
+ ensure
+ Thread.current[:isomorfeus_user] = nil
end
def mark_as_failed(task, exception)
- task.state = 'failed'
+ Thread.current[:isomorfeus_user] = LocalSystem.new
+
+ task[:state] = 'failed'
save_exception(task, exception)
+ ensure
+ Thread.current[:isomorfeus_user] = nil
end
- def save_exception(taks, exception)
- task.exception = Marshal.dump(exception)
+ def save_exception(task, exception)
+ Thread.current[:isomorfeus_user] = LocalSystem.new
+
+ task[:exception] = Marshal.dump(exception)
task.save
+ ensure
+ Thread.current[:isomorfeus_user] = nil
end
def remove_task(task)
+ Thread.current[:isomorfeus_user] = LocalSystem.new
+
@task_class.destroy(key: task.key)
+ ensure
+ Thread.current[:isomorfeus_user] = nil
end
def cleanup
- running_tasks = @task_class.search(:state, 'running').sort_by! { |task| task.rtime.to_i }
+ Thread.current[:isomorfeus_user] = LocalSystem.new
+
+ running_tasks = @task_class.search(:running).sort_by! do |task|
+ task[:rtime] ? Time.parse(task[:rtime]).to_i : 0
+ end
running_tasks.each do |task|
# previous task run has timed out most probably
- mark_as_failed(task, RuntimeError.new('Operation execution timed out, giving up.')) if (@rtime - task.rtime) > @interval
+ trt = task[:rtime] ? Time.parse(task[:rtime]) : 0
+ mark_as_failed(task, RuntimeError.new('Operation execution timed out, giving up.')) if (@rtime - trt) > @interval
end
+ ensure
+ Thread.current[:isomorfeus_user] = nil
end
end
end
end