lib/pallets/worker.rb in pallets-0.2.0 vs lib/pallets/worker.rb in pallets-0.3.0

- old
+ new

@@ -24,10 +24,14 @@ def needs_to_stop? @needs_to_stop end + def debug + @thread.backtrace + end + def id "W#{@thread.object_id.to_s(36)}".upcase if @thread end private @@ -47,52 +51,71 @@ end @manager.remove_worker(self) rescue Pallets::Shutdown @manager.remove_worker(self) rescue => ex + Pallets.logger.error "#{ex.class.name}: #{ex.message}", wid: id + Pallets.logger.error ex.backtrace.join("\n"), wid: id unless ex.backtrace.nil? @manager.replace_worker(self) end def process(job) - Pallets.logger.info "[#{id}] Picked job: #{job}" begin job_hash = serializer.load(job) rescue # We ensure only valid jobs are created. If something fishy reaches this - # point, just discard it - backend.discard(job) + # point, just give up on it + backend.give_up(job, job) + Pallets.logger.error "Could not deserialize #{job}. Gave up job", wid: id return end + Pallets.logger.info "Started", extract_metadata(job_hash) + + context = Context[backend.get_context(job_hash['workflow_id'])] + task_class = Pallets::Util.constantize(job_hash["class_name"]) - task = task_class.new(job_hash["context"]) + task = task_class.new(context) begin task.run rescue => ex handle_job_error(ex, job, job_hash) else - backend.save(job_hash["workflow_id"], job) - Pallets.logger.info "[#{id}] Successfully processed #{job}" + handle_job_success(context, job, job_hash) end end def handle_job_error(ex, job, job_hash) - Pallets.logger.error "[#{id}] Error while processing: #{ex}" + Pallets.logger.warn "#{ex.class.name}: #{ex.message}", extract_metadata(job_hash) + Pallets.logger.warn ex.backtrace.join("\n"), extract_metadata(job_hash) unless ex.backtrace.nil? failures = job_hash.fetch('failures', 0) + 1 new_job = serializer.dump(job_hash.merge( 'failures' => failures, 'failed_at' => Time.now.to_f, 'error_class' => ex.class.name, 'error_message' => ex.message )) if failures < job_hash['max_failures'] retry_at = Time.now.to_f + backoff_in_seconds(failures) backend.retry(new_job, job, retry_at) - Pallets.logger.info "[#{id}] Scheduled job for retry" else - backend.give_up(new_job, job, Time.now.to_f) - Pallets.logger.info "[#{id}] Given up on job" + backend.give_up(new_job, job) + Pallets.logger.info "Gave up after #{failures} failed attempts", extract_metadata(job_hash) end + end + + def handle_job_success(context, job, job_hash) + backend.save(job_hash['workflow_id'], job, context.buffer) + Pallets.logger.info "Done", extract_metadata(job_hash) + end + + def extract_metadata(job_hash) + { + wid: id, + wfid: job_hash['workflow_id'], + wf: job_hash['workflow_class_name'], + tsk: job_hash['class_name'] + } end def backoff_in_seconds(count) count ** 4 + 6 end