lib/pallets/worker.rb in pallets-0.3.0 vs lib/pallets/worker.rb in pallets-0.4.0

- old
+ new

@@ -69,57 +69,74 @@ return end Pallets.logger.info "Started", extract_metadata(job_hash) - context = Context[backend.get_context(job_hash['workflow_id'])] + context = Context[ + serializer.load_context(backend.get_context(job_hash['wfid'])) + ] - task_class = Pallets::Util.constantize(job_hash["class_name"]) + task_class = Pallets::Util.constantize(job_hash["task_class"]) task = task_class.new(context) begin - task.run + task_result = task.run rescue => ex handle_job_error(ex, job, job_hash) else - handle_job_success(context, job, job_hash) + if task_result == false + handle_job_return_false(job, job_hash) + else + handle_job_success(context, job, job_hash) + end end end def handle_job_error(ex, job, job_hash) Pallets.logger.warn "#{ex.class.name}: #{ex.message}", extract_metadata(job_hash) Pallets.logger.warn ex.backtrace.join("\n"), extract_metadata(job_hash) unless ex.backtrace.nil? failures = job_hash.fetch('failures', 0) + 1 new_job = serializer.dump(job_hash.merge( 'failures' => failures, - 'failed_at' => Time.now.to_f, + 'given_up_at' => Time.now.to_f, 'error_class' => ex.class.name, - 'error_message' => ex.message + 'error_message' => ex.message, + 'reason' => 'error' )) if failures < job_hash['max_failures'] retry_at = Time.now.to_f + backoff_in_seconds(failures) backend.retry(new_job, job, retry_at) else backend.give_up(new_job, job) Pallets.logger.info "Gave up after #{failures} failed attempts", extract_metadata(job_hash) end end + def handle_job_return_false(job, job_hash) + new_job = serializer.dump(job_hash.merge( + 'given_up_at' => Time.now.to_f, + 'reason' => 'returned_false' + )) + backend.give_up(new_job, job) + Pallets.logger.info "Gave up after returning false", extract_metadata(job_hash) + end + def handle_job_success(context, job, job_hash) - backend.save(job_hash['workflow_id'], job, context.buffer) + backend.save(job_hash['wfid'], job, serializer.dump_context(context.buffer)) Pallets.logger.info "Done", extract_metadata(job_hash) end def extract_metadata(job_hash) { wid: id, - wfid: job_hash['workflow_id'], - wf: job_hash['workflow_class_name'], - tsk: job_hash['class_name'] + wfid: job_hash['wfid'], + jid: job_hash['jid'], + wf: job_hash['workflow_class'], + tsk: job_hash['task_class'] } end def backoff_in_seconds(count) - count ** 4 + 6 + count ** 4 + rand(6..10) end def backend @backend ||= Pallets.backend end