Sha256: f66cd6fcafefb6f2e0879e65d194aed8a951229195f7d7142f719d91239aa6da
Contents?: true
Size: 1.16 KB
Versions: 2
Compression:
Stored size: 1.16 KB
Contents
# frozen_string_literal: true require "json" require "litequeue" module Litejob # Litejob::Processor is responsible for processing job payloads class Processor def initialize(payload) @payload = payload @queue = Litequeue.instance end def repush(id, job, delay = 0, queue = nil) @queue.repush(id, JSON.dump(job), queue: queue, delay: delay) end def process! id, serialized_job = @payload job_hash = JSON.parse(serialized_job) klass = Object.const_get(job_hash["class"]) instance = klass.new begin instance.perform(*job_hash["params"]) rescue if job_hash["retries_left"] == 0 repush(id, job_hash, 0, "_dead") else job_hash["retries_left"] ||= job_hash["attempts"] job_hash["retries_left"] -= 1 retry_delay = (job_hash["attempts"] - job_hash["retries_left"]) * 0.1 repush(id, job_hash, retry_delay, job_hash["queue"]) end end rescue => exception # standard:disable Lint/UselessRescue # this is an error in the extraction of job info, retrying here will not be useful raise exception end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
litejob-0.2.1 | lib/litejob/processor.rb |
litejob-0.2.0 | lib/litejob/processor.rb |