lib/litejob/processor.rb in litejob-0.2.1 vs lib/litejob/processor.rb in litejob-0.2.2

- old
+ new

@@ -4,38 +4,69 @@ require "litequeue" module Litejob # Litejob::Processor is responsible for processing job payloads class Processor - def initialize(payload) - @payload = payload - @queue = Litequeue.instance + def initialize(queue, id, serialized_job) + @queue = queue + @id = id + @serialized_job = serialized_job + @job_hash = JSON.parse(@serialized_job) + @litequeue = Litequeue.instance + + set_log_context!(queue: @queue, class: @job_hash["class"], job: @id) end def repush(id, job, delay = 0, queue = nil) - @queue.repush(id, JSON.dump(job), queue: queue, delay: delay) + @litequeue.repush(id, JSON.dump(job), queue: queue, delay: delay) end def process! - id, serialized_job = @payload - job_hash = JSON.parse(serialized_job) - klass = Object.const_get(job_hash["class"]) + log(:deq) + klass = Object.const_get(@job_hash["class"]) instance = klass.new begin - instance.perform(*job_hash["params"]) - rescue - if job_hash["retries_left"] == 0 - repush(id, job_hash, 0, "_dead") + instance.perform(*@job_hash["params"]) + log(:end) + rescue StandardError => e + if @job_hash["retries_left"] == 0 + err(e, "retries exhausted, moving to _dead queue") + repush(@id, @job_hash, 0, "_dead") else - job_hash["retries_left"] ||= job_hash["attempts"] - job_hash["retries_left"] -= 1 - retry_delay = (job_hash["attempts"] - job_hash["retries_left"]) * 0.1 - repush(id, job_hash, retry_delay, job_hash["queue"]) + @job_hash["retries_left"] ||= @job_hash["attempts"] + @job_hash["retries_left"] -= 1 + retry_delay = (@job_hash["attempts"] - @job_hash["retries_left"]) * 0.1 + err(e, "retrying in #{retry_delay} seconds") + repush(@id, @job_hash, retry_delay, @job_hash["queue"]) end end - rescue => exception # standard:disable Lint/UselessRescue + rescue StandardError => e # this is an error in the extraction of job info, retrying here will not be useful - raise exception + err(e, "while processing job=#{@serialized_job}") + raise e + end + + private + + def set_log_context!(**attributes) + @log_context = attributes.map { |k, v| [k, v].join('=') }.join(' ') + end + + def log(event, msg: nil) + prefix = "[litejob]:[#{event.to_s.upcase}]" + + Litejob.logger.info [prefix, @log_context, msg].compact.join(" ") + end + + def err(exception, msg = nil) + prefix = "[litejob]:[ERR]" + error_context = if exception.class.name == exception.message + "failed with #<#{exception.class.name}>" + else + "failed with #{exception.inspect}" + end + + Litejob.logger.error [prefix, @log_context, error_context, msg].compact.join(" ") end end end