lib/good_job/adapter.rb in good_job-3.12.5 vs lib/good_job/adapter.rb in good_job-3.12.6
- old
+ new
@@ -22,13 +22,14 @@
#
# - +development+: +:async:+
# -+test+: +:inline+
# - +production+ and all other environments: +:external+
#
- def initialize(execution_mode: nil)
+ def initialize(execution_mode: nil, _capsule: GoodJob.capsule) # rubocop:disable Lint/UnderscorePrefixedVariableName
@_execution_mode_override = execution_mode
GoodJob::Configuration.validate_execution_mode(@_execution_mode_override) if @_execution_mode_override
+ @capsule = _capsule
self.class.instances << self
start_async if GoodJob.async_ready?
end
@@ -96,11 +97,11 @@
non_inline_executions.group_by(&:queue_name).each do |queue_name, executions_by_queue|
executions_by_queue.group_by(&:scheduled_at).each do |scheduled_at, executions_by_queue_and_scheduled_at|
state = { queue_name: queue_name, count: executions_by_queue_and_scheduled_at.size }
state[:scheduled_at] = scheduled_at if scheduled_at
- executed_locally = execute_async? && @scheduler&.create_thread(state)
+ executed_locally = execute_async? && @capsule&.create_thread(state)
unless executed_locally
state[:count] = job_id_to_active_jobs.values_at(*executions_by_queue_and_scheduled_at.map(&:active_job_id)).count { |active_job| send_notify?(active_job) }
Notifier.notify(state) unless state[:count].zero?
end
end
@@ -139,33 +140,26 @@
raise result.unhandled_error if result.unhandled_error
else
job_state = { queue_name: execution.queue_name }
job_state[:scheduled_at] = execution.scheduled_at if execution.scheduled_at
- executed_locally = execute_async? && @scheduler&.create_thread(job_state)
+ executed_locally = execute_async? && @capsule&.create_thread(job_state)
Notifier.notify(job_state) if !executed_locally && send_notify?(active_job)
end
execution
end
# Shut down the thread pool executors.
# @param timeout [nil, Numeric, Symbol] Seconds to wait for active threads.
- # * +nil+, the scheduler will trigger a shutdown but not wait for it to complete.
- # * +-1+, the scheduler will wait until the shutdown is complete.
- # * +0+, the scheduler will immediately shutdown and stop any threads.
+ # * +nil+ trigger a shutdown but not wait for it to complete.
+ # * +-1+ wait until the shutdown is complete.
+ # * +0+ immediately shutdown and stop any threads.
# * A positive number will wait that many seconds before stopping any remaining active threads.
# @return [void]
def shutdown(timeout: :default)
- timeout = if timeout == :default
- GoodJob.configuration.shutdown_timeout
- else
- timeout
- end
-
- executables = [@notifier, @poller, @scheduler].compact
- GoodJob._shutdown_all(executables, timeout: timeout)
+ @capsule&.shutdown(timeout: timeout)
@_async_started = false
end
# This adapter's execution mode
# @return [Symbol, nil]
@@ -197,17 +191,10 @@
# Start async executors
# @return [void]
def start_async
return unless execute_async?
- @notifier = GoodJob::Notifier.new(enable_listening: GoodJob.configuration.enable_listen_notify)
- @poller = GoodJob::Poller.new(poll_interval: GoodJob.configuration.poll_interval)
- @scheduler = GoodJob::Scheduler.from_configuration(GoodJob.configuration, warm_cache_on_initialize: true)
- @notifier.recipients << [@scheduler, :create_thread]
- @poller.recipients << [@scheduler, :create_thread]
-
- @cron_manager = GoodJob::CronManager.new(GoodJob.configuration.cron_entries, start_on_initialize: true) if GoodJob.configuration.enable_cron?
-
+ @capsule.start
@_async_started = true
end
# Whether the async executors are running
# @return [Boolean]