Sha256: b08be03164156129c6f7f51fea448fa1174a91737722c23cf901fff98a9b0d1c
Contents?: true
Size: 1.7 KB
Versions: 1
Compression:
Stored size: 1.7 KB
Contents
require 'unique_job/logging' require 'unique_job/job_history' module UniqueJob module Util include Logging def perform(worker, job, &block) if worker.respond_to?(:unique_key) unique_key = worker.unique_key(*job['args']) logger.debug { "[UniqueJob] Unique key calculated worker=#{job['class']} key=#{unique_key}" } if unique?(worker, unique_key) elsif unique?(worker, unique_key) yield else logger.debug { "[UniqueJob] Duplicate job skipped worker=#{job['class']} key=#{unique_key}" } perform_callback(worker, :after_skip, job['args']) nil end else yield end end def unique?(worker, key) if key.nil? || key.to_s.empty? logger.warn { "[UniqueJob] Don't check a job with a blank key worker=#{worker.class} key=#{key}" } return false end history = job_history(worker) if history.exists?(key) false else history.add(key) true end end def job_history(worker) ttl = worker.respond_to?(:unique_in) ? worker.unique_in : 3600 JobHistory.redis_options = @redis_options JobHistory.new(worker.class, self.class, ttl) end def perform_callback(worker, callback_name, args) if worker.respond_to?(callback_name) parameters = worker.method(callback_name).parameters begin if parameters.empty? worker.send(callback_name) else worker.send(callback_name, *args) end rescue ArgumentError => e raise ArgumentError.new("[UniqueJob] Invalid parameters callback=#{callback_name}") end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
unique_job-0.4.4 | lib/unique_job/util.rb |