lib/good_job/adapter.rb in good_job-4.1.0 vs lib/good_job/adapter.rb in good_job-4.1.1

- old
+ new

@@ -55,74 +55,68 @@ active_jobs = Array(active_jobs) return 0 if active_jobs.empty? Rails.application.executor.wrap do current_time = Time.current - executions = active_jobs.map do |active_job| + jobs = active_jobs.map do |active_job| GoodJob::Job.build_for_enqueue(active_job).tap do |job| job.scheduled_at = current_time if job.scheduled_at == job.created_at job.created_at = current_time job.updated_at = current_time end end - inline_executions = [] + inline_jobs = [] GoodJob::Job.transaction(requires_new: true, joinable: false) do - execution_attributes = executions.map(&:attributes) - results = GoodJob::Job.insert_all(execution_attributes, returning: %w[id active_job_id]) # rubocop:disable Rails/SkipsModelValidations + job_attributes = jobs.map(&:attributes) + results = GoodJob::Job.insert_all(job_attributes, returning: %w[id active_job_id]) # rubocop:disable Rails/SkipsModelValidations job_id_to_provider_job_id = results.each_with_object({}) { |result, hash| hash[result['active_job_id']] = result['id'] } active_jobs.each do |active_job| active_job.provider_job_id = job_id_to_provider_job_id[active_job.job_id] active_job.successfully_enqueued = active_job.provider_job_id.present? if active_job.respond_to?(:successfully_enqueued=) end - executions.each do |execution| - execution.instance_variable_set(:@new_record, false) if job_id_to_provider_job_id[execution.active_job_id] + jobs.each do |job| + job.instance_variable_set(:@new_record, false) if job_id_to_provider_job_id[job.active_job_id] end - executions = executions.select(&:persisted?) # prune unpersisted executions + jobs = jobs.select(&:persisted?) # prune unpersisted jobs if execute_inline? - inline_executions = executions.select { |execution| (execution.scheduled_at.nil? || execution.scheduled_at <= Time.current) } - inline_executions.each(&:advisory_lock!) + inline_jobs = jobs.select { |job| job.scheduled_at.nil? || job.scheduled_at <= Time.current } + inline_jobs.each(&:advisory_lock!) end end - @capsule.tracker.register - begin - until inline_executions.empty? - begin - inline_execution = inline_executions.shift - inline_result = inline_execution.perform(lock_id: @capsule.tracker.id_for_lock) - - retried_execution = inline_result.retried - while retried_execution && retried_execution.scheduled_at <= Time.current - inline_execution = retried_execution - inline_result = inline_execution.perform(lock_id: @capsule.tracker.id_for_lock) - retried_execution = inline_result.retried + if inline_jobs.any? + deferred = InlineBuffer.defer? + InlineBuffer.perform_now_or_defer do + @capsule.tracker.register do + until inline_jobs.empty? + inline_job = inline_jobs.shift + perform_inline(inline_job, notify: deferred ? send_notify?(inline_job) : false) end ensure - inline_execution.advisory_unlock - inline_execution.run_callbacks(:perform_unlocked) + inline_jobs.each(&:advisory_unlock) end - raise inline_result.unhandled_error if inline_result.unhandled_error end - ensure - @capsule.tracker.unregister - inline_executions.each(&:advisory_unlock) end - non_inline_executions = executions.reject(&:finished_at) - if non_inline_executions.any? + non_inline_jobs = if InlineBuffer.defer? + jobs - inline_jobs + else + jobs.reject(&:finished_at) + end + if non_inline_jobs.any? job_id_to_active_jobs = active_jobs.index_by(&:job_id) - non_inline_executions.group_by(&:queue_name).each do |queue_name, executions_by_queue| - executions_by_queue.group_by(&:scheduled_at).each do |scheduled_at, executions_by_queue_and_scheduled_at| - state = { queue_name: queue_name, count: executions_by_queue_and_scheduled_at.size } + non_inline_jobs.group_by(&:queue_name).each do |queue_name, jobs_by_queue| + jobs_by_queue.group_by(&:scheduled_at).each do |scheduled_at, jobs_by_queue_and_scheduled_at| + state = { queue_name: queue_name, count: jobs_by_queue_and_scheduled_at.size } state[:scheduled_at] = scheduled_at if scheduled_at executed_locally = execute_async? && @capsule&.create_thread(state) unless executed_locally - state[:count] = job_id_to_active_jobs.values_at(*executions_by_queue_and_scheduled_at.map(&:active_job_id)).count { |active_job| send_notify?(active_job) } + state[:count] = job_id_to_active_jobs.values_at(*jobs_by_queue_and_scheduled_at.map(&:active_job_id)).count { |active_job| send_notify?(active_job) } Notifier.notify(state) unless state[:count].zero? end end end end @@ -146,47 +140,36 @@ Rails.application.executor.wrap do will_execute_inline = execute_inline? && (scheduled_at.nil? || scheduled_at <= Time.current) will_retry_inline = will_execute_inline && CurrentThread.job&.active_job_id == active_job.job_id && !CurrentThread.retry_now if will_retry_inline - execution = GoodJob::Job.enqueue( + job = GoodJob::Job.enqueue( active_job, scheduled_at: scheduled_at ) elsif will_execute_inline - execution = GoodJob::Job.enqueue( + job = GoodJob::Job.enqueue( active_job, scheduled_at: scheduled_at, create_with_advisory_lock: true ) - begin - result = @capsule.tracker.register { execution.perform(lock_id: @capsule.tracker.id_for_lock) } - - retried_execution = result.retried - while retried_execution && (retried_execution.scheduled_at.nil? || retried_execution.scheduled_at <= Time.current) - execution = retried_execution - result = @capsule.tracker.register { execution.perform(lock_id: @capsule.tracker.id_for_lock) } - retried_execution = result.retried + InlineBuffer.perform_now_or_defer do + @capsule.tracker.register do + perform_inline(job, notify: send_notify?(active_job)) end - - Notifier.notify(retried_execution.job_state) if retried_execution&.scheduled_at && retried_execution.scheduled_at > Time.current && send_notify?(active_job) - ensure - execution.advisory_unlock - execution.run_callbacks(:perform_unlocked) end - raise result.unhandled_error if result.unhandled_error else - execution = GoodJob::Job.enqueue( + job = GoodJob::Job.enqueue( active_job, scheduled_at: scheduled_at ) - executed_locally = execute_async? && @capsule&.create_thread(execution.job_state) - Notifier.notify(execution.job_state) if !executed_locally && send_notify?(active_job) + executed_locally = execute_async? && @capsule&.create_thread(job.job_state) + Notifier.notify(job.job_state) if !executed_locally && send_notify?(active_job) end - execution + job end end # Shut down the thread pool executors. # @param timeout [nil, Numeric, NONE] Seconds to wait for active threads. @@ -247,8 +230,31 @@ def send_notify?(active_job) return false unless GoodJob.configuration.enable_listen_notify return true unless active_job.respond_to?(:good_job_notify) !(active_job.good_job_notify == false || (active_job.class.good_job_notify == false && active_job.good_job_notify.nil?)) + end + + # @param job [GoodJob::Job] the job to perform, which must be enqueued and advisory locked already + # @param notify [Boolean] whether to send a NOTIFY event for a retried job + def perform_inline(job, notify: true) + result = nil + retried_job = nil + + loop do + result = job.perform(lock_id: @capsule.tracker.id_for_lock) + retried_job = result.retried_job + break if retried_job.nil? || retried_job.scheduled_at.nil? || retried_job.scheduled_at > Time.current + + job = retried_job + end + + Notifier.notify(retried_job.job_state) if notify && retried_job&.scheduled_at && retried_job.scheduled_at > Time.current + result + ensure + job.advisory_unlock + job.run_callbacks(:perform_unlocked) + + raise result.unhandled_error if result.unhandled_error end end end