lib/good_job/execution.rb in good_job-2.4.2 vs lib/good_job/execution.rb in good_job-2.5.0

- old
+ new

@@ -8,10 +8,13 @@ include Lockable # Raised if something attempts to execute a previously completed Execution again. PreviouslyPerformedError = Class.new(StandardError) + # String separating Error Class from Error Message + ERROR_MESSAGE_SEPARATOR = ": " + # ActiveJob jobs without a +queue_name+ attribute are placed on this queue. DEFAULT_QUEUE_NAME = 'default' # ActiveJob jobs without a +priority+ attribute are given this priority. DEFAULT_PRIORITY = 0 @@ -48,10 +51,20 @@ else { include: queues } end end + def self._migration_pending_warning + ActiveSupport::Deprecation.warn(<<~DEPRECATION) + GoodJob has pending database migrations. To create the migration files, run: + rails generate good_job:update + To apply the migration files, run: + rails db:migrate + DEPRECATION + nil + end + # Get Jobs with given ActiveJob ID # @!method active_job_id # @!scope class # @param active_job_id [String] # ActiveJob ID @@ -220,22 +233,30 @@ execution_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key) if CurrentThread.cron_key execution_args[:cron_key] = CurrentThread.cron_key - elsif CurrentThread.active_job_id == active_job.job_id + + @cron_at_index = column_names.include?('cron_at') && connection.index_name_exists?(:good_jobs, :index_good_jobs_on_cron_key_and_cron_at) unless instance_variable_defined?(:@cron_at_index) + + if @cron_at_index + execution_args[:cron_at] = CurrentThread.cron_at + else + _migration_pending_warning + end + elsif CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id execution_args[:cron_key] = CurrentThread.execution.cron_key end execution = GoodJob::Execution.new(**execution_args) instrument_payload[:execution] = execution execution.save! active_job.provider_job_id = execution.id - CurrentThread.execution.retried_good_job_id = execution.id if CurrentThread.execution && CurrentThread.execution.active_job_id == active_job.job_id + CurrentThread.execution.retried_good_job_id = execution.id if CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id execution end end @@ -251,11 +272,11 @@ save! if GoodJob.preserve_job_records result = execute job_error = result.handled_error || result.unhandled_error - self.error = "#{job_error.class}: #{job_error.message}" if job_error + self.error = [job_error.class, ERROR_MESSAGE_SEPARATOR, job_error.message].join if job_error if result.unhandled_error && GoodJob.retry_on_unhandled_error save! elsif GoodJob.preserve_job_records == true || (result.unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error) self.finished_at = Time.current @@ -271,22 +292,30 @@ # @return [Boolean] def executable? self.class.unscoped.unfinished.owns_advisory_locked.exists?(id: id) end + def active_job + ActiveJob::Base.deserialize(active_job_data) + end + private + def active_job_data + serialized_params.deep_dup + .tap do |job_data| + job_data["provider_job_id"] = id + end + end + # @return [ExecutionResult] def execute GoodJob::CurrentThread.reset GoodJob::CurrentThread.execution = self - job_data = serialized_params.deep_dup - job_data["provider_job_id"] = id - # DEPRECATION: Remove deprecated `good_job:` parameter in GoodJob v3 ActiveSupport::Notifications.instrument("perform_job.good_job", { good_job: self, execution: self, process_id: GoodJob::CurrentThread.process_id, thread_name: GoodJob::CurrentThread.thread_name }) do - value = ActiveJob::Base.execute(job_data) + value = ActiveJob::Base.execute(active_job_data) if value.is_a?(Exception) handled_error = value value = nil end