lib/good_job/job.rb in good_job-0.8.2 vs lib/good_job/job.rb in good_job-0.9.0

- old
+ new

@@ -1,37 +1,51 @@ module GoodJob class Job < ActiveRecord::Base include Lockable + PreviouslyPerformedError = Class.new(StandardError) + DEFAULT_QUEUE_NAME = 'default'.freeze DEFAULT_PRIORITY = 0 self.table_name = 'good_jobs'.freeze + scope :unfinished, (lambda do + if column_names.include?('finished_at') + where(finished_at: nil) + else + ActiveSupport::Deprecation.warn('GoodJob expects a good_jobs.finished_at column to exist. Please see the GoodJob README.md for migration instructions.') + nil + end + end) scope :only_scheduled, -> { where(arel_table['scheduled_at'].lteq(Time.current)).or(where(scheduled_at: nil)) } scope :priority_ordered, -> { order(priority: :desc) } - scope :to_performer, -> { Performer.new(self) } + scope :finished, ->(timestamp = nil) { timestamp ? where(arel_table['finished_at'].lteq(timestamp)) : where.not(finished_at: nil) } - class Performer - def initialize(query) - @query = query - end + def self.perform_with_advisory_lock(destroy_after: !GoodJob.preserve_job_records) + good_job = nil + result = nil + error = nil - def next - good_job = nil + unfinished.only_scheduled.limit(1).with_advisory_lock do |good_jobs| + good_job = good_jobs.first + break unless good_job - @query.only_scheduled.limit(1).with_advisory_lock do |good_jobs| - good_job = good_jobs.first - break unless good_job + result, error = good_job.perform(destroy_after: destroy_after) + end - good_job.perform - end + [good_job, result, error] if good_job + end - good_job - end + def self.perform_with_advisory_lock_and_preserve_job_records + perform_with_advisory_lock(destroy_after: false) end + def self.perform_with_advisory_lock_and_destroy_job_records + perform_with_advisory_lock(destroy_after: true) + end + def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) good_job = nil ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload| good_job = GoodJob::Job.new( queue_name: active_job.queue_name.presence || DEFAULT_QUEUE_NAME, @@ -48,18 +62,45 @@ end good_job end - def perform + def perform(destroy_after: true) + raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at + + result = nil + error = nil + ActiveSupport::Notifications.instrument("before_perform_job.good_job", { good_job: self }) + self.performed_at = Time.current + save! unless destroy_after + ActiveSupport::Notifications.instrument("perform_job.good_job", { good_job: self }) do params = serialized_params.merge( "provider_job_id" => id ) - ActiveJob::Base.execute(params) + begin + result = ActiveJob::Base.execute(params) + rescue StandardError => e + error = e + end + end + if error.nil? && result.is_a?(Exception) + error = result + result = nil + end + + error_message = "#{error.class}: #{error.message}" if error + self.error = error_message + self.finished_at = Time.current + + if destroy_after destroy! + else + save! end + + [result, error] end end end