lib/good_job/adapter.rb in good_job-3.19.4 vs lib/good_job/adapter.rb in good_job-3.20.0

- old
+ new

@@ -47,78 +47,80 @@ # @return [Integer] number of jobs that were successfully enqueued def enqueue_all(active_jobs) active_jobs = Array(active_jobs) return 0 if active_jobs.empty? - current_time = Time.current - executions = active_jobs.map do |active_job| - GoodJob::Execution.build_for_enqueue(active_job).tap do |execution| - if GoodJob::Execution.discrete_support? - execution.make_discrete - execution.scheduled_at = current_time if execution.scheduled_at == execution.created_at - end + Rails.application.reloader.wrap do + current_time = Time.current + executions = active_jobs.map do |active_job| + GoodJob::Execution.build_for_enqueue(active_job).tap do |execution| + if GoodJob::Execution.discrete_support? + execution.make_discrete + execution.scheduled_at = current_time if execution.scheduled_at == execution.created_at + end - execution.created_at = current_time - execution.updated_at = current_time + execution.created_at = current_time + execution.updated_at = current_time + end end - end - inline_executions = [] - GoodJob::Execution.transaction(requires_new: true, joinable: false) do - execution_attributes = executions.map do |execution| - if GoodJob::Execution.error_event_migrated? - execution.attributes - else - execution.attributes.except('error_event') + inline_executions = [] + GoodJob::Execution.transaction(requires_new: true, joinable: false) do + execution_attributes = executions.map do |execution| + if GoodJob::Execution.error_event_migrated? + execution.attributes + else + execution.attributes.except('error_event') + end end - end - results = GoodJob::Execution.insert_all(execution_attributes, returning: %w[id active_job_id]) # rubocop:disable Rails/SkipsModelValidations + results = GoodJob::Execution.insert_all(execution_attributes, returning: %w[id active_job_id]) # rubocop:disable Rails/SkipsModelValidations - job_id_to_provider_job_id = results.each_with_object({}) { |result, hash| hash[result['active_job_id']] = result['id'] } - active_jobs.each do |active_job| - active_job.provider_job_id = job_id_to_provider_job_id[active_job.job_id] - active_job.successfully_enqueued = active_job.provider_job_id.present? if active_job.respond_to?(:successfully_enqueued=) - end - executions.each do |execution| - execution.instance_variable_set(:@new_record, false) if job_id_to_provider_job_id[execution.active_job_id] - end - executions = executions.select(&:persisted?) # prune unpersisted executions + job_id_to_provider_job_id = results.each_with_object({}) { |result, hash| hash[result['active_job_id']] = result['id'] } + active_jobs.each do |active_job| + active_job.provider_job_id = job_id_to_provider_job_id[active_job.job_id] + active_job.successfully_enqueued = active_job.provider_job_id.present? if active_job.respond_to?(:successfully_enqueued=) + end + executions.each do |execution| + execution.instance_variable_set(:@new_record, false) if job_id_to_provider_job_id[execution.active_job_id] + end + executions = executions.select(&:persisted?) # prune unpersisted executions - if execute_inline? - inline_executions = executions.select { |execution| (execution.scheduled_at.nil? || execution.scheduled_at <= Time.current) } - inline_executions.each(&:advisory_lock!) + if execute_inline? + inline_executions = executions.select { |execution| (execution.scheduled_at.nil? || execution.scheduled_at <= Time.current) } + inline_executions.each(&:advisory_lock!) + end end - end - begin - until inline_executions.empty? - begin - inline_execution = inline_executions.shift - inline_result = inline_execution.perform - ensure - inline_execution.advisory_unlock - inline_execution.run_callbacks(:perform_unlocked) + begin + until inline_executions.empty? + begin + inline_execution = inline_executions.shift + inline_result = inline_execution.perform + ensure + inline_execution.advisory_unlock + inline_execution.run_callbacks(:perform_unlocked) + end + raise inline_result.unhandled_error if inline_result.unhandled_error end - raise inline_result.unhandled_error if inline_result.unhandled_error + ensure + inline_executions.each(&:advisory_unlock) end - ensure - inline_executions.each(&:advisory_unlock) - end - non_inline_executions = executions.reject(&:finished_at) - if non_inline_executions.any? - job_id_to_active_jobs = active_jobs.index_by(&:job_id) - non_inline_executions.group_by(&:queue_name).each do |queue_name, executions_by_queue| - executions_by_queue.group_by(&:scheduled_at).each do |scheduled_at, executions_by_queue_and_scheduled_at| - state = { queue_name: queue_name, count: executions_by_queue_and_scheduled_at.size } - state[:scheduled_at] = scheduled_at if scheduled_at + non_inline_executions = executions.reject(&:finished_at) + if non_inline_executions.any? + job_id_to_active_jobs = active_jobs.index_by(&:job_id) + non_inline_executions.group_by(&:queue_name).each do |queue_name, executions_by_queue| + executions_by_queue.group_by(&:scheduled_at).each do |scheduled_at, executions_by_queue_and_scheduled_at| + state = { queue_name: queue_name, count: executions_by_queue_and_scheduled_at.size } + state[:scheduled_at] = scheduled_at if scheduled_at - executed_locally = execute_async? && @capsule&.create_thread(state) - unless executed_locally - state[:count] = job_id_to_active_jobs.values_at(*executions_by_queue_and_scheduled_at.map(&:active_job_id)).count { |active_job| send_notify?(active_job) } - Notifier.notify(state) unless state[:count].zero? + executed_locally = execute_async? && @capsule&.create_thread(state) + unless executed_locally + state[:count] = job_id_to_active_jobs.values_at(*executions_by_queue_and_scheduled_at.map(&:active_job_id)).count { |active_job| send_notify?(active_job) } + Notifier.notify(state) unless state[:count].zero? + end end end end end @@ -135,33 +137,35 @@ # If there is a currently open Bulk in the current thread, direct the # job there to be enqueued using enqueue_all return if GoodJob::Bulk.capture(active_job, queue_adapter: self) - will_execute_inline = execute_inline? && (scheduled_at.nil? || scheduled_at <= Time.current) - execution = GoodJob::Execution.enqueue( - active_job, - scheduled_at: scheduled_at, - create_with_advisory_lock: will_execute_inline - ) + Rails.application.reloader.wrap do + will_execute_inline = execute_inline? && (scheduled_at.nil? || scheduled_at <= Time.current) + execution = GoodJob::Execution.enqueue( + active_job, + scheduled_at: scheduled_at, + create_with_advisory_lock: will_execute_inline + ) - if will_execute_inline - begin - result = execution.perform - ensure - execution.advisory_unlock - execution.run_callbacks(:perform_unlocked) + if will_execute_inline + begin + result = execution.perform + ensure + execution.advisory_unlock + execution.run_callbacks(:perform_unlocked) + end + raise result.unhandled_error if result.unhandled_error + else + job_state = { queue_name: execution.queue_name } + job_state[:scheduled_at] = execution.scheduled_at if execution.scheduled_at + + executed_locally = execute_async? && @capsule&.create_thread(job_state) + Notifier.notify(job_state) if !executed_locally && send_notify?(active_job) end - raise result.unhandled_error if result.unhandled_error - else - job_state = { queue_name: execution.queue_name } - job_state[:scheduled_at] = execution.scheduled_at if execution.scheduled_at - executed_locally = execute_async? && @capsule&.create_thread(job_state) - Notifier.notify(job_state) if !executed_locally && send_notify?(active_job) + execution end - - execution end # Shut down the thread pool executors. # @param timeout [nil, Numeric, Symbol] Seconds to wait for active threads. # * +nil+ trigger a shutdown but not wait for it to complete.