Sha256: f2dec724414d14d73a72434af6ced9eb64391aacf493b00285a4b760cb6f2a31
Contents?: true
Size: 1.9 KB
Versions: 2
Compression:
Stored size: 1.9 KB
Contents
module GoodJob class Job < ActiveRecord::Base include Lockable DEFAULT_QUEUE_NAME = 'default'.freeze DEFAULT_PRIORITY = 0 self.table_name = 'good_jobs'.freeze scope :only_scheduled, -> { where(arel_table['scheduled_at'].lteq(Time.current)).or(where(scheduled_at: nil)) } scope :priority_ordered, -> { order(priority: :desc) } scope :to_performer, -> { Performer.new(self) } class Performer def initialize(query) @query = query end def next good_job = nil @query.only_scheduled.limit(1).with_advisory_lock do |good_jobs| good_job = good_jobs.first break unless good_job good_job.perform end good_job end end def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) good_job = nil ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload| good_job = GoodJob::Job.new( queue_name: active_job.queue_name.presence || DEFAULT_QUEUE_NAME, priority: active_job.priority || DEFAULT_PRIORITY, serialized_params: active_job.serialize, scheduled_at: scheduled_at || Time.current, create_with_advisory_lock: create_with_advisory_lock ) instrument_payload[:good_job] = good_job good_job.save! active_job.provider_job_id = good_job.id end good_job end def perform ActiveSupport::Notifications.instrument("before_perform_job.good_job", { good_job: self }) ActiveSupport::Notifications.instrument("perform_job.good_job", { good_job: self }) do params = serialized_params.merge( "provider_job_id" => id ) ActiveJob::Base.execute(params) destroy! end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
good_job-0.8.2 | lib/good_job/job.rb |
good_job-0.8.1 | lib/good_job/job.rb |