Sha256: 172372e041090ef76bcb2c45fee7185a413675f792c2eb0d94ab73e68a95ca55
Contents?: true
Size: 1.89 KB
Versions: 2
Compression:
Stored size: 1.89 KB
Contents
# frozen_string_literal: true require "active_record" module SidekiqPublisher class Job < ActiveRecord::Base self.table_name = "sidekiq_publisher_jobs" BATCH_KEYS = %i(id job_id job_class args run_at queue wrapped).freeze before_create :ensure_job_id before_save :ensure_string_job_class validates :job_class, presence: true validates :args, exclusion: { in: [nil] } scope :unpublished, -> { where(published_at: nil) } scope :published, -> { where.not(published_at: nil) } scope :purgeable, -> { where("published_at < ?", Time.now.utc - job_retention_period) } def self.generate_sidekiq_jid SecureRandom.hex(12) end def self.job_retention_period SidekiqPublisher.job_retention_period end def self.published!(ids) where(id: ids).update_all(published_at: Time.now.utc) end def self.purge_expired_published! SidekiqPublisher.logger.info("#{name} purging expired published jobs.") count = purgeable.delete_all SidekiqPublisher.logger.info("#{name} purged #{count} expired published jobs.") end def self.unpublished_batches(batch_size: SidekiqPublisher.batch_size) unpublished.in_batches(of: batch_size, load: false) do |relation| batch = relation.pluck(*BATCH_KEYS) yield batch.map { |values| Hash[BATCH_KEYS.zip(values)] } end end # TODO: this method was just for testing and may be removed def publish Sidekiq::Client.push(sidekiq_item) end def sidekiq_item { "jid" => job_id, "class" => job_class.constantize, "args" => args, "at" => run_at, "queue" => queue, "wrapped" => wrapped, }.tap(&:compact!) end private def ensure_job_id self.job_id ||= self.class.generate_sidekiq_jid end def ensure_string_job_class self.job_class = job_class.to_s end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
sidekiq_publisher-0.1.0 | lib/sidekiq_publisher/job.rb |
sidekiq_publisher-0.1.0.rc0 | lib/sidekiq_publisher/job.rb |