lib/pallets/worker.rb in pallets-0.4.0 vs lib/pallets/worker.rb in pallets-0.5.0
- old
+ new
@@ -67,20 +67,20 @@
backend.give_up(job, job)
Pallets.logger.error "Could not deserialize #{job}. Gave up job", wid: id
return
end
- Pallets.logger.info "Started", extract_metadata(job_hash)
-
context = Context[
serializer.load_context(backend.get_context(job_hash['wfid']))
]
task_class = Pallets::Util.constantize(job_hash["task_class"])
task = task_class.new(context)
begin
- task_result = task.run
+ task_result = middleware.invoke(self, job_hash, context) do
+ task.run
+ end
rescue => ex
handle_job_error(ex, job, job_hash)
else
if task_result == false
handle_job_return_false(job, job_hash)
@@ -89,12 +89,10 @@
end
end
end
def handle_job_error(ex, job, job_hash)
- Pallets.logger.warn "#{ex.class.name}: #{ex.message}", extract_metadata(job_hash)
- Pallets.logger.warn ex.backtrace.join("\n"), extract_metadata(job_hash) unless ex.backtrace.nil?
failures = job_hash.fetch('failures', 0) + 1
new_job = serializer.dump(job_hash.merge(
'failures' => failures,
'given_up_at' => Time.now.to_f,
'error_class' => ex.class.name,
@@ -104,46 +102,37 @@
if failures < job_hash['max_failures']
retry_at = Time.now.to_f + backoff_in_seconds(failures)
backend.retry(new_job, job, retry_at)
else
backend.give_up(new_job, job)
- Pallets.logger.info "Gave up after #{failures} failed attempts", extract_metadata(job_hash)
end
end
def handle_job_return_false(job, job_hash)
new_job = serializer.dump(job_hash.merge(
'given_up_at' => Time.now.to_f,
'reason' => 'returned_false'
))
backend.give_up(new_job, job)
- Pallets.logger.info "Gave up after returning false", extract_metadata(job_hash)
end
def handle_job_success(context, job, job_hash)
backend.save(job_hash['wfid'], job, serializer.dump_context(context.buffer))
- Pallets.logger.info "Done", extract_metadata(job_hash)
end
- def extract_metadata(job_hash)
- {
- wid: id,
- wfid: job_hash['wfid'],
- jid: job_hash['jid'],
- wf: job_hash['workflow_class'],
- tsk: job_hash['task_class']
- }
- end
-
def backoff_in_seconds(count)
count ** 4 + rand(6..10)
end
def backend
@backend ||= Pallets.backend
end
def serializer
@serializer ||= Pallets.serializer
+ end
+
+ def middleware
+ @middleware ||= Pallets.middleware
end
end
end