lib/good_job/adapter.rb in good_job-3.12.5 vs lib/good_job/adapter.rb in good_job-3.12.6

- old
+ new

@@ -22,13 +22,14 @@ # # - +development+: +:async:+ # -+test+: +:inline+ # - +production+ and all other environments: +:external+ # - def initialize(execution_mode: nil) + def initialize(execution_mode: nil, _capsule: GoodJob.capsule) # rubocop:disable Lint/UnderscorePrefixedVariableName @_execution_mode_override = execution_mode GoodJob::Configuration.validate_execution_mode(@_execution_mode_override) if @_execution_mode_override + @capsule = _capsule self.class.instances << self start_async if GoodJob.async_ready? end @@ -96,11 +97,11 @@ non_inline_executions.group_by(&:queue_name).each do |queue_name, executions_by_queue| executions_by_queue.group_by(&:scheduled_at).each do |scheduled_at, executions_by_queue_and_scheduled_at| state = { queue_name: queue_name, count: executions_by_queue_and_scheduled_at.size } state[:scheduled_at] = scheduled_at if scheduled_at - executed_locally = execute_async? && @scheduler&.create_thread(state) + executed_locally = execute_async? && @capsule&.create_thread(state) unless executed_locally state[:count] = job_id_to_active_jobs.values_at(*executions_by_queue_and_scheduled_at.map(&:active_job_id)).count { |active_job| send_notify?(active_job) } Notifier.notify(state) unless state[:count].zero? end end @@ -139,33 +140,26 @@ raise result.unhandled_error if result.unhandled_error else job_state = { queue_name: execution.queue_name } job_state[:scheduled_at] = execution.scheduled_at if execution.scheduled_at - executed_locally = execute_async? && @scheduler&.create_thread(job_state) + executed_locally = execute_async? && @capsule&.create_thread(job_state) Notifier.notify(job_state) if !executed_locally && send_notify?(active_job) end execution end # Shut down the thread pool executors. # @param timeout [nil, Numeric, Symbol] Seconds to wait for active threads. - # * +nil+, the scheduler will trigger a shutdown but not wait for it to complete. - # * +-1+, the scheduler will wait until the shutdown is complete. - # * +0+, the scheduler will immediately shutdown and stop any threads. + # * +nil+ trigger a shutdown but not wait for it to complete. + # * +-1+ wait until the shutdown is complete. + # * +0+ immediately shutdown and stop any threads. # * A positive number will wait that many seconds before stopping any remaining active threads. # @return [void] def shutdown(timeout: :default) - timeout = if timeout == :default - GoodJob.configuration.shutdown_timeout - else - timeout - end - - executables = [@notifier, @poller, @scheduler].compact - GoodJob._shutdown_all(executables, timeout: timeout) + @capsule&.shutdown(timeout: timeout) @_async_started = false end # This adapter's execution mode # @return [Symbol, nil] @@ -197,17 +191,10 @@ # Start async executors # @return [void] def start_async return unless execute_async? - @notifier = GoodJob::Notifier.new(enable_listening: GoodJob.configuration.enable_listen_notify) - @poller = GoodJob::Poller.new(poll_interval: GoodJob.configuration.poll_interval) - @scheduler = GoodJob::Scheduler.from_configuration(GoodJob.configuration, warm_cache_on_initialize: true) - @notifier.recipients << [@scheduler, :create_thread] - @poller.recipients << [@scheduler, :create_thread] - - @cron_manager = GoodJob::CronManager.new(GoodJob.configuration.cron_entries, start_on_initialize: true) if GoodJob.configuration.enable_cron? - + @capsule.start @_async_started = true end # Whether the async executors are running # @return [Boolean]