lib/good_job/scheduler.rb in good_job-1.0.3 vs lib/good_job/scheduler.rb in good_job-1.1.0
- old
+ new
@@ -18,46 +18,52 @@
idletime: 60,
max_queue: -1,
fallback_policy: :discard,
}.freeze
+ cattr_reader :instances, default: [], instance_reader: false
+
def initialize(performer, timer_options: {}, pool_options: {})
raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next)
+ self.class.instances << self
+
@performer = performer
- @pool = ThreadPoolExecutor.new(DEFAULT_POOL_OPTIONS.merge(pool_options))
- @timer = Concurrent::TimerTask.new(DEFAULT_TIMER_OPTIONS.merge(timer_options)) do
- create_thread
- end
- @timer.add_observer(self, :timer_observer)
- @timer.execute
- end
+ @pool_options = DEFAULT_POOL_OPTIONS.merge(pool_options)
+ @timer_options = DEFAULT_TIMER_OPTIONS.merge(timer_options)
- def execute
+ create_pools
end
def shutdown(wait: true)
@_shutdown = true
- ActiveSupport::Notifications.instrument("scheduler_start_shutdown.good_job", { wait: wait })
- ActiveSupport::Notifications.instrument("scheduler_shutdown.good_job", { wait: wait }) do
- if @timer.running?
+ ActiveSupport::Notifications.instrument("scheduler_shutdown_start.good_job", { wait: wait, process_id: process_id })
+ ActiveSupport::Notifications.instrument("scheduler_shutdown.good_job", { wait: wait, process_id: process_id }) do
+ if @timer&.running?
@timer.shutdown
@timer.wait_for_termination if wait
end
- if @pool.running?
+ if @pool&.running?
@pool.shutdown
@pool.wait_for_termination if wait
end
end
end
def shutdown?
@_shutdown
end
+ def restart(wait: true)
+ ActiveSupport::Notifications.instrument("scheduler_restart_pools.good_job", { process_id: process_id }) do
+ shutdown(wait: wait) unless shutdown?
+ create_pools
+ end
+ end
+
def create_thread
return false unless @pool.ready_worker_count.positive?
future = Concurrent::Future.new(args: [@performer], executor: @pool) do |performer|
output = nil
@@ -67,14 +73,16 @@
future.add_observer(self, :task_observer)
future.execute
end
def timer_observer(time, executed_task, thread_error)
+ GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call)
ActiveSupport::Notifications.instrument("finished_timer_task.good_job", { result: executed_task, error: thread_error, time: time })
end
def task_observer(time, output, thread_error)
+ GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call)
ActiveSupport::Notifications.instrument("finished_job_task.good_job", { result: output, error: thread_error, time: time })
create_thread if output
end
class ThreadPoolExecutor < Concurrent::ThreadPoolExecutor
@@ -85,8 +93,29 @@
workers_created_but_waiting = @ready.length
workers_still_to_be_created + workers_created_but_waiting
end
end
+ end
+
+ private
+
+ def create_pools
+ ActiveSupport::Notifications.instrument("scheduler_create_pools.good_job", { performer_name: @performer.name, max_threads: @pool_options[:max_threads], poll_interval: @timer_options[:execution_interval], process_id: process_id }) do
+ @pool = ThreadPoolExecutor.new(@pool_options)
+ next unless @timer_options[:execution_interval].positive?
+
+ @timer = Concurrent::TimerTask.new(@timer_options) { create_thread }
+ @timer.add_observer(self, :timer_observer)
+ @timer.execute
+ end
+ end
+
+ def process_id
+ Process.pid
+ end
+
+ def thread_name
+ Thread.current.name || Thread.current.object_id
end
end
end