lib/pallets/worker.rb in pallets-0.2.0 vs lib/pallets/worker.rb in pallets-0.3.0
- old
+ new
@@ -24,10 +24,14 @@
def needs_to_stop?
@needs_to_stop
end
+ def debug
+ @thread.backtrace
+ end
+
def id
"W#{@thread.object_id.to_s(36)}".upcase if @thread
end
private
@@ -47,52 +51,71 @@
end
@manager.remove_worker(self)
rescue Pallets::Shutdown
@manager.remove_worker(self)
rescue => ex
+ Pallets.logger.error "#{ex.class.name}: #{ex.message}", wid: id
+ Pallets.logger.error ex.backtrace.join("\n"), wid: id unless ex.backtrace.nil?
@manager.replace_worker(self)
end
def process(job)
- Pallets.logger.info "[#{id}] Picked job: #{job}"
begin
job_hash = serializer.load(job)
rescue
# We ensure only valid jobs are created. If something fishy reaches this
- # point, just discard it
- backend.discard(job)
+ # point, just give up on it
+ backend.give_up(job, job)
+ Pallets.logger.error "Could not deserialize #{job}. Gave up job", wid: id
return
end
+ Pallets.logger.info "Started", extract_metadata(job_hash)
+
+ context = Context[backend.get_context(job_hash['workflow_id'])]
+
task_class = Pallets::Util.constantize(job_hash["class_name"])
- task = task_class.new(job_hash["context"])
+ task = task_class.new(context)
begin
task.run
rescue => ex
handle_job_error(ex, job, job_hash)
else
- backend.save(job_hash["workflow_id"], job)
- Pallets.logger.info "[#{id}] Successfully processed #{job}"
+ handle_job_success(context, job, job_hash)
end
end
def handle_job_error(ex, job, job_hash)
- Pallets.logger.error "[#{id}] Error while processing: #{ex}"
+ Pallets.logger.warn "#{ex.class.name}: #{ex.message}", extract_metadata(job_hash)
+ Pallets.logger.warn ex.backtrace.join("\n"), extract_metadata(job_hash) unless ex.backtrace.nil?
failures = job_hash.fetch('failures', 0) + 1
new_job = serializer.dump(job_hash.merge(
'failures' => failures,
'failed_at' => Time.now.to_f,
'error_class' => ex.class.name,
'error_message' => ex.message
))
if failures < job_hash['max_failures']
retry_at = Time.now.to_f + backoff_in_seconds(failures)
backend.retry(new_job, job, retry_at)
- Pallets.logger.info "[#{id}] Scheduled job for retry"
else
- backend.give_up(new_job, job, Time.now.to_f)
- Pallets.logger.info "[#{id}] Given up on job"
+ backend.give_up(new_job, job)
+ Pallets.logger.info "Gave up after #{failures} failed attempts", extract_metadata(job_hash)
end
+ end
+
+ def handle_job_success(context, job, job_hash)
+ backend.save(job_hash['workflow_id'], job, context.buffer)
+ Pallets.logger.info "Done", extract_metadata(job_hash)
+ end
+
+ def extract_metadata(job_hash)
+ {
+ wid: id,
+ wfid: job_hash['workflow_id'],
+ wf: job_hash['workflow_class_name'],
+ tsk: job_hash['class_name']
+ }
end
def backoff_in_seconds(count)
count ** 4 + 6
end