lib/good_job/execution.rb in good_job-2.4.2 vs lib/good_job/execution.rb in good_job-2.5.0
- old
+ new
@@ -8,10 +8,13 @@
include Lockable
# Raised if something attempts to execute a previously completed Execution again.
PreviouslyPerformedError = Class.new(StandardError)
+ # String separating Error Class from Error Message
+ ERROR_MESSAGE_SEPARATOR = ": "
+
# ActiveJob jobs without a +queue_name+ attribute are placed on this queue.
DEFAULT_QUEUE_NAME = 'default'
# ActiveJob jobs without a +priority+ attribute are given this priority.
DEFAULT_PRIORITY = 0
@@ -48,10 +51,20 @@
else
{ include: queues }
end
end
+ def self._migration_pending_warning
+ ActiveSupport::Deprecation.warn(<<~DEPRECATION)
+ GoodJob has pending database migrations. To create the migration files, run:
+ rails generate good_job:update
+ To apply the migration files, run:
+ rails db:migrate
+ DEPRECATION
+ nil
+ end
+
# Get Jobs with given ActiveJob ID
# @!method active_job_id
# @!scope class
# @param active_job_id [String]
# ActiveJob ID
@@ -220,22 +233,30 @@
execution_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key)
if CurrentThread.cron_key
execution_args[:cron_key] = CurrentThread.cron_key
- elsif CurrentThread.active_job_id == active_job.job_id
+
+ @cron_at_index = column_names.include?('cron_at') && connection.index_name_exists?(:good_jobs, :index_good_jobs_on_cron_key_and_cron_at) unless instance_variable_defined?(:@cron_at_index)
+
+ if @cron_at_index
+ execution_args[:cron_at] = CurrentThread.cron_at
+ else
+ _migration_pending_warning
+ end
+ elsif CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id
execution_args[:cron_key] = CurrentThread.execution.cron_key
end
execution = GoodJob::Execution.new(**execution_args)
instrument_payload[:execution] = execution
execution.save!
active_job.provider_job_id = execution.id
- CurrentThread.execution.retried_good_job_id = execution.id if CurrentThread.execution && CurrentThread.execution.active_job_id == active_job.job_id
+ CurrentThread.execution.retried_good_job_id = execution.id if CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id
execution
end
end
@@ -251,11 +272,11 @@
save! if GoodJob.preserve_job_records
result = execute
job_error = result.handled_error || result.unhandled_error
- self.error = "#{job_error.class}: #{job_error.message}" if job_error
+ self.error = [job_error.class, ERROR_MESSAGE_SEPARATOR, job_error.message].join if job_error
if result.unhandled_error && GoodJob.retry_on_unhandled_error
save!
elsif GoodJob.preserve_job_records == true || (result.unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error)
self.finished_at = Time.current
@@ -271,22 +292,30 @@
# @return [Boolean]
def executable?
self.class.unscoped.unfinished.owns_advisory_locked.exists?(id: id)
end
+ def active_job
+ ActiveJob::Base.deserialize(active_job_data)
+ end
+
private
+ def active_job_data
+ serialized_params.deep_dup
+ .tap do |job_data|
+ job_data["provider_job_id"] = id
+ end
+ end
+
# @return [ExecutionResult]
def execute
GoodJob::CurrentThread.reset
GoodJob::CurrentThread.execution = self
- job_data = serialized_params.deep_dup
- job_data["provider_job_id"] = id
-
# DEPRECATION: Remove deprecated `good_job:` parameter in GoodJob v3
ActiveSupport::Notifications.instrument("perform_job.good_job", { good_job: self, execution: self, process_id: GoodJob::CurrentThread.process_id, thread_name: GoodJob::CurrentThread.thread_name }) do
- value = ActiveJob::Base.execute(job_data)
+ value = ActiveJob::Base.execute(active_job_data)
if value.is_a?(Exception)
handled_error = value
value = nil
end