lib/good_job/adapter.rb in good_job-3.19.4 vs lib/good_job/adapter.rb in good_job-3.20.0
- old
+ new
@@ -47,78 +47,80 @@
# @return [Integer] number of jobs that were successfully enqueued
def enqueue_all(active_jobs)
active_jobs = Array(active_jobs)
return 0 if active_jobs.empty?
- current_time = Time.current
- executions = active_jobs.map do |active_job|
- GoodJob::Execution.build_for_enqueue(active_job).tap do |execution|
- if GoodJob::Execution.discrete_support?
- execution.make_discrete
- execution.scheduled_at = current_time if execution.scheduled_at == execution.created_at
- end
+ Rails.application.reloader.wrap do
+ current_time = Time.current
+ executions = active_jobs.map do |active_job|
+ GoodJob::Execution.build_for_enqueue(active_job).tap do |execution|
+ if GoodJob::Execution.discrete_support?
+ execution.make_discrete
+ execution.scheduled_at = current_time if execution.scheduled_at == execution.created_at
+ end
- execution.created_at = current_time
- execution.updated_at = current_time
+ execution.created_at = current_time
+ execution.updated_at = current_time
+ end
end
- end
- inline_executions = []
- GoodJob::Execution.transaction(requires_new: true, joinable: false) do
- execution_attributes = executions.map do |execution|
- if GoodJob::Execution.error_event_migrated?
- execution.attributes
- else
- execution.attributes.except('error_event')
+ inline_executions = []
+ GoodJob::Execution.transaction(requires_new: true, joinable: false) do
+ execution_attributes = executions.map do |execution|
+ if GoodJob::Execution.error_event_migrated?
+ execution.attributes
+ else
+ execution.attributes.except('error_event')
+ end
end
- end
- results = GoodJob::Execution.insert_all(execution_attributes, returning: %w[id active_job_id]) # rubocop:disable Rails/SkipsModelValidations
+ results = GoodJob::Execution.insert_all(execution_attributes, returning: %w[id active_job_id]) # rubocop:disable Rails/SkipsModelValidations
- job_id_to_provider_job_id = results.each_with_object({}) { |result, hash| hash[result['active_job_id']] = result['id'] }
- active_jobs.each do |active_job|
- active_job.provider_job_id = job_id_to_provider_job_id[active_job.job_id]
- active_job.successfully_enqueued = active_job.provider_job_id.present? if active_job.respond_to?(:successfully_enqueued=)
- end
- executions.each do |execution|
- execution.instance_variable_set(:@new_record, false) if job_id_to_provider_job_id[execution.active_job_id]
- end
- executions = executions.select(&:persisted?) # prune unpersisted executions
+ job_id_to_provider_job_id = results.each_with_object({}) { |result, hash| hash[result['active_job_id']] = result['id'] }
+ active_jobs.each do |active_job|
+ active_job.provider_job_id = job_id_to_provider_job_id[active_job.job_id]
+ active_job.successfully_enqueued = active_job.provider_job_id.present? if active_job.respond_to?(:successfully_enqueued=)
+ end
+ executions.each do |execution|
+ execution.instance_variable_set(:@new_record, false) if job_id_to_provider_job_id[execution.active_job_id]
+ end
+ executions = executions.select(&:persisted?) # prune unpersisted executions
- if execute_inline?
- inline_executions = executions.select { |execution| (execution.scheduled_at.nil? || execution.scheduled_at <= Time.current) }
- inline_executions.each(&:advisory_lock!)
+ if execute_inline?
+ inline_executions = executions.select { |execution| (execution.scheduled_at.nil? || execution.scheduled_at <= Time.current) }
+ inline_executions.each(&:advisory_lock!)
+ end
end
- end
- begin
- until inline_executions.empty?
- begin
- inline_execution = inline_executions.shift
- inline_result = inline_execution.perform
- ensure
- inline_execution.advisory_unlock
- inline_execution.run_callbacks(:perform_unlocked)
+ begin
+ until inline_executions.empty?
+ begin
+ inline_execution = inline_executions.shift
+ inline_result = inline_execution.perform
+ ensure
+ inline_execution.advisory_unlock
+ inline_execution.run_callbacks(:perform_unlocked)
+ end
+ raise inline_result.unhandled_error if inline_result.unhandled_error
end
- raise inline_result.unhandled_error if inline_result.unhandled_error
+ ensure
+ inline_executions.each(&:advisory_unlock)
end
- ensure
- inline_executions.each(&:advisory_unlock)
- end
- non_inline_executions = executions.reject(&:finished_at)
- if non_inline_executions.any?
- job_id_to_active_jobs = active_jobs.index_by(&:job_id)
- non_inline_executions.group_by(&:queue_name).each do |queue_name, executions_by_queue|
- executions_by_queue.group_by(&:scheduled_at).each do |scheduled_at, executions_by_queue_and_scheduled_at|
- state = { queue_name: queue_name, count: executions_by_queue_and_scheduled_at.size }
- state[:scheduled_at] = scheduled_at if scheduled_at
+ non_inline_executions = executions.reject(&:finished_at)
+ if non_inline_executions.any?
+ job_id_to_active_jobs = active_jobs.index_by(&:job_id)
+ non_inline_executions.group_by(&:queue_name).each do |queue_name, executions_by_queue|
+ executions_by_queue.group_by(&:scheduled_at).each do |scheduled_at, executions_by_queue_and_scheduled_at|
+ state = { queue_name: queue_name, count: executions_by_queue_and_scheduled_at.size }
+ state[:scheduled_at] = scheduled_at if scheduled_at
- executed_locally = execute_async? && @capsule&.create_thread(state)
- unless executed_locally
- state[:count] = job_id_to_active_jobs.values_at(*executions_by_queue_and_scheduled_at.map(&:active_job_id)).count { |active_job| send_notify?(active_job) }
- Notifier.notify(state) unless state[:count].zero?
+ executed_locally = execute_async? && @capsule&.create_thread(state)
+ unless executed_locally
+ state[:count] = job_id_to_active_jobs.values_at(*executions_by_queue_and_scheduled_at.map(&:active_job_id)).count { |active_job| send_notify?(active_job) }
+ Notifier.notify(state) unless state[:count].zero?
+ end
end
end
end
end
@@ -135,33 +137,35 @@
# If there is a currently open Bulk in the current thread, direct the
# job there to be enqueued using enqueue_all
return if GoodJob::Bulk.capture(active_job, queue_adapter: self)
- will_execute_inline = execute_inline? && (scheduled_at.nil? || scheduled_at <= Time.current)
- execution = GoodJob::Execution.enqueue(
- active_job,
- scheduled_at: scheduled_at,
- create_with_advisory_lock: will_execute_inline
- )
+ Rails.application.reloader.wrap do
+ will_execute_inline = execute_inline? && (scheduled_at.nil? || scheduled_at <= Time.current)
+ execution = GoodJob::Execution.enqueue(
+ active_job,
+ scheduled_at: scheduled_at,
+ create_with_advisory_lock: will_execute_inline
+ )
- if will_execute_inline
- begin
- result = execution.perform
- ensure
- execution.advisory_unlock
- execution.run_callbacks(:perform_unlocked)
+ if will_execute_inline
+ begin
+ result = execution.perform
+ ensure
+ execution.advisory_unlock
+ execution.run_callbacks(:perform_unlocked)
+ end
+ raise result.unhandled_error if result.unhandled_error
+ else
+ job_state = { queue_name: execution.queue_name }
+ job_state[:scheduled_at] = execution.scheduled_at if execution.scheduled_at
+
+ executed_locally = execute_async? && @capsule&.create_thread(job_state)
+ Notifier.notify(job_state) if !executed_locally && send_notify?(active_job)
end
- raise result.unhandled_error if result.unhandled_error
- else
- job_state = { queue_name: execution.queue_name }
- job_state[:scheduled_at] = execution.scheduled_at if execution.scheduled_at
- executed_locally = execute_async? && @capsule&.create_thread(job_state)
- Notifier.notify(job_state) if !executed_locally && send_notify?(active_job)
+ execution
end
-
- execution
end
# Shut down the thread pool executors.
# @param timeout [nil, Numeric, Symbol] Seconds to wait for active threads.
# * +nil+ trigger a shutdown but not wait for it to complete.