lib/good_job/adapter.rb in good_job-4.1.0 vs lib/good_job/adapter.rb in good_job-4.1.1
- old
+ new
@@ -55,74 +55,68 @@
active_jobs = Array(active_jobs)
return 0 if active_jobs.empty?
Rails.application.executor.wrap do
current_time = Time.current
- executions = active_jobs.map do |active_job|
+ jobs = active_jobs.map do |active_job|
GoodJob::Job.build_for_enqueue(active_job).tap do |job|
job.scheduled_at = current_time if job.scheduled_at == job.created_at
job.created_at = current_time
job.updated_at = current_time
end
end
- inline_executions = []
+ inline_jobs = []
GoodJob::Job.transaction(requires_new: true, joinable: false) do
- execution_attributes = executions.map(&:attributes)
- results = GoodJob::Job.insert_all(execution_attributes, returning: %w[id active_job_id]) # rubocop:disable Rails/SkipsModelValidations
+ job_attributes = jobs.map(&:attributes)
+ results = GoodJob::Job.insert_all(job_attributes, returning: %w[id active_job_id]) # rubocop:disable Rails/SkipsModelValidations
job_id_to_provider_job_id = results.each_with_object({}) { |result, hash| hash[result['active_job_id']] = result['id'] }
active_jobs.each do |active_job|
active_job.provider_job_id = job_id_to_provider_job_id[active_job.job_id]
active_job.successfully_enqueued = active_job.provider_job_id.present? if active_job.respond_to?(:successfully_enqueued=)
end
- executions.each do |execution|
- execution.instance_variable_set(:@new_record, false) if job_id_to_provider_job_id[execution.active_job_id]
+ jobs.each do |job|
+ job.instance_variable_set(:@new_record, false) if job_id_to_provider_job_id[job.active_job_id]
end
- executions = executions.select(&:persisted?) # prune unpersisted executions
+ jobs = jobs.select(&:persisted?) # prune unpersisted jobs
if execute_inline?
- inline_executions = executions.select { |execution| (execution.scheduled_at.nil? || execution.scheduled_at <= Time.current) }
- inline_executions.each(&:advisory_lock!)
+ inline_jobs = jobs.select { |job| job.scheduled_at.nil? || job.scheduled_at <= Time.current }
+ inline_jobs.each(&:advisory_lock!)
end
end
- @capsule.tracker.register
- begin
- until inline_executions.empty?
- begin
- inline_execution = inline_executions.shift
- inline_result = inline_execution.perform(lock_id: @capsule.tracker.id_for_lock)
-
- retried_execution = inline_result.retried
- while retried_execution && retried_execution.scheduled_at <= Time.current
- inline_execution = retried_execution
- inline_result = inline_execution.perform(lock_id: @capsule.tracker.id_for_lock)
- retried_execution = inline_result.retried
+ if inline_jobs.any?
+ deferred = InlineBuffer.defer?
+ InlineBuffer.perform_now_or_defer do
+ @capsule.tracker.register do
+ until inline_jobs.empty?
+ inline_job = inline_jobs.shift
+ perform_inline(inline_job, notify: deferred ? send_notify?(inline_job) : false)
end
ensure
- inline_execution.advisory_unlock
- inline_execution.run_callbacks(:perform_unlocked)
+ inline_jobs.each(&:advisory_unlock)
end
- raise inline_result.unhandled_error if inline_result.unhandled_error
end
- ensure
- @capsule.tracker.unregister
- inline_executions.each(&:advisory_unlock)
end
- non_inline_executions = executions.reject(&:finished_at)
- if non_inline_executions.any?
+ non_inline_jobs = if InlineBuffer.defer?
+ jobs - inline_jobs
+ else
+ jobs.reject(&:finished_at)
+ end
+ if non_inline_jobs.any?
job_id_to_active_jobs = active_jobs.index_by(&:job_id)
- non_inline_executions.group_by(&:queue_name).each do |queue_name, executions_by_queue|
- executions_by_queue.group_by(&:scheduled_at).each do |scheduled_at, executions_by_queue_and_scheduled_at|
- state = { queue_name: queue_name, count: executions_by_queue_and_scheduled_at.size }
+ non_inline_jobs.group_by(&:queue_name).each do |queue_name, jobs_by_queue|
+ jobs_by_queue.group_by(&:scheduled_at).each do |scheduled_at, jobs_by_queue_and_scheduled_at|
+ state = { queue_name: queue_name, count: jobs_by_queue_and_scheduled_at.size }
state[:scheduled_at] = scheduled_at if scheduled_at
executed_locally = execute_async? && @capsule&.create_thread(state)
unless executed_locally
- state[:count] = job_id_to_active_jobs.values_at(*executions_by_queue_and_scheduled_at.map(&:active_job_id)).count { |active_job| send_notify?(active_job) }
+ state[:count] = job_id_to_active_jobs.values_at(*jobs_by_queue_and_scheduled_at.map(&:active_job_id)).count { |active_job| send_notify?(active_job) }
Notifier.notify(state) unless state[:count].zero?
end
end
end
end
@@ -146,47 +140,36 @@
Rails.application.executor.wrap do
will_execute_inline = execute_inline? && (scheduled_at.nil? || scheduled_at <= Time.current)
will_retry_inline = will_execute_inline && CurrentThread.job&.active_job_id == active_job.job_id && !CurrentThread.retry_now
if will_retry_inline
- execution = GoodJob::Job.enqueue(
+ job = GoodJob::Job.enqueue(
active_job,
scheduled_at: scheduled_at
)
elsif will_execute_inline
- execution = GoodJob::Job.enqueue(
+ job = GoodJob::Job.enqueue(
active_job,
scheduled_at: scheduled_at,
create_with_advisory_lock: true
)
- begin
- result = @capsule.tracker.register { execution.perform(lock_id: @capsule.tracker.id_for_lock) }
-
- retried_execution = result.retried
- while retried_execution && (retried_execution.scheduled_at.nil? || retried_execution.scheduled_at <= Time.current)
- execution = retried_execution
- result = @capsule.tracker.register { execution.perform(lock_id: @capsule.tracker.id_for_lock) }
- retried_execution = result.retried
+ InlineBuffer.perform_now_or_defer do
+ @capsule.tracker.register do
+ perform_inline(job, notify: send_notify?(active_job))
end
-
- Notifier.notify(retried_execution.job_state) if retried_execution&.scheduled_at && retried_execution.scheduled_at > Time.current && send_notify?(active_job)
- ensure
- execution.advisory_unlock
- execution.run_callbacks(:perform_unlocked)
end
- raise result.unhandled_error if result.unhandled_error
else
- execution = GoodJob::Job.enqueue(
+ job = GoodJob::Job.enqueue(
active_job,
scheduled_at: scheduled_at
)
- executed_locally = execute_async? && @capsule&.create_thread(execution.job_state)
- Notifier.notify(execution.job_state) if !executed_locally && send_notify?(active_job)
+ executed_locally = execute_async? && @capsule&.create_thread(job.job_state)
+ Notifier.notify(job.job_state) if !executed_locally && send_notify?(active_job)
end
- execution
+ job
end
end
# Shut down the thread pool executors.
# @param timeout [nil, Numeric, NONE] Seconds to wait for active threads.
@@ -247,8 +230,31 @@
def send_notify?(active_job)
return false unless GoodJob.configuration.enable_listen_notify
return true unless active_job.respond_to?(:good_job_notify)
!(active_job.good_job_notify == false || (active_job.class.good_job_notify == false && active_job.good_job_notify.nil?))
+ end
+
+ # @param job [GoodJob::Job] the job to perform, which must be enqueued and advisory locked already
+ # @param notify [Boolean] whether to send a NOTIFY event for a retried job
+ def perform_inline(job, notify: true)
+ result = nil
+ retried_job = nil
+
+ loop do
+ result = job.perform(lock_id: @capsule.tracker.id_for_lock)
+ retried_job = result.retried_job
+ break if retried_job.nil? || retried_job.scheduled_at.nil? || retried_job.scheduled_at > Time.current
+
+ job = retried_job
+ end
+
+ Notifier.notify(retried_job.job_state) if notify && retried_job&.scheduled_at && retried_job.scheduled_at > Time.current
+ result
+ ensure
+ job.advisory_unlock
+ job.run_callbacks(:perform_unlocked)
+
+ raise result.unhandled_error if result.unhandled_error
end
end
end