lib/pallets/worker.rb in pallets-0.3.0 vs lib/pallets/worker.rb in pallets-0.4.0
- old
+ new
@@ -69,57 +69,74 @@
return
end
Pallets.logger.info "Started", extract_metadata(job_hash)
- context = Context[backend.get_context(job_hash['workflow_id'])]
+ context = Context[
+ serializer.load_context(backend.get_context(job_hash['wfid']))
+ ]
- task_class = Pallets::Util.constantize(job_hash["class_name"])
+ task_class = Pallets::Util.constantize(job_hash["task_class"])
task = task_class.new(context)
begin
- task.run
+ task_result = task.run
rescue => ex
handle_job_error(ex, job, job_hash)
else
- handle_job_success(context, job, job_hash)
+ if task_result == false
+ handle_job_return_false(job, job_hash)
+ else
+ handle_job_success(context, job, job_hash)
+ end
end
end
def handle_job_error(ex, job, job_hash)
Pallets.logger.warn "#{ex.class.name}: #{ex.message}", extract_metadata(job_hash)
Pallets.logger.warn ex.backtrace.join("\n"), extract_metadata(job_hash) unless ex.backtrace.nil?
failures = job_hash.fetch('failures', 0) + 1
new_job = serializer.dump(job_hash.merge(
'failures' => failures,
- 'failed_at' => Time.now.to_f,
+ 'given_up_at' => Time.now.to_f,
'error_class' => ex.class.name,
- 'error_message' => ex.message
+ 'error_message' => ex.message,
+ 'reason' => 'error'
))
if failures < job_hash['max_failures']
retry_at = Time.now.to_f + backoff_in_seconds(failures)
backend.retry(new_job, job, retry_at)
else
backend.give_up(new_job, job)
Pallets.logger.info "Gave up after #{failures} failed attempts", extract_metadata(job_hash)
end
end
+ def handle_job_return_false(job, job_hash)
+ new_job = serializer.dump(job_hash.merge(
+ 'given_up_at' => Time.now.to_f,
+ 'reason' => 'returned_false'
+ ))
+ backend.give_up(new_job, job)
+ Pallets.logger.info "Gave up after returning false", extract_metadata(job_hash)
+ end
+
def handle_job_success(context, job, job_hash)
- backend.save(job_hash['workflow_id'], job, context.buffer)
+ backend.save(job_hash['wfid'], job, serializer.dump_context(context.buffer))
Pallets.logger.info "Done", extract_metadata(job_hash)
end
def extract_metadata(job_hash)
{
wid: id,
- wfid: job_hash['workflow_id'],
- wf: job_hash['workflow_class_name'],
- tsk: job_hash['class_name']
+ wfid: job_hash['wfid'],
+ jid: job_hash['jid'],
+ wf: job_hash['workflow_class'],
+ tsk: job_hash['task_class']
}
end
def backoff_in_seconds(count)
- count ** 4 + 6
+ count ** 4 + rand(6..10)
end
def backend
@backend ||= Pallets.backend
end