lib/good_job/scheduler.rb in good_job-1.0.1 vs lib/good_job/scheduler.rb in good_job-1.0.2
- old
+ new
@@ -14,22 +14,21 @@
name: 'good_job',
min_threads: 0,
max_threads: Concurrent.processor_count,
auto_terminate: true,
idletime: 60,
- max_queue: 0,
- fallback_policy: :abort, # shouldn't matter -- 0 max queue
+ max_queue: -1,
+ fallback_policy: :discard,
}.freeze
def initialize(performer, timer_options: {}, pool_options: {})
raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next)
@performer = performer
- @pool = Concurrent::ThreadPoolExecutor.new(DEFAULT_POOL_OPTIONS.merge(pool_options))
+ @pool = ThreadPoolExecutor.new(DEFAULT_POOL_OPTIONS.merge(pool_options))
@timer = Concurrent::TimerTask.new(DEFAULT_TIMER_OPTIONS.merge(timer_options)) do
- idle_threads = @pool.max_length - @pool.length
- create_thread if idle_threads.positive?
+ create_thread
end
@timer.add_observer(self, :timer_observer)
@timer.execute
end
@@ -56,10 +55,12 @@
def shutdown?
@_shutdown
end
def create_thread
+ return false unless @pool.ready_worker_count.positive?
+
future = Concurrent::Future.new(args: [@performer], executor: @pool) do |performer|
output = nil
Rails.application.executor.wrap { output = performer.next }
output
end
@@ -72,8 +73,20 @@
end
def task_observer(time, output, thread_error)
ActiveSupport::Notifications.instrument("finished_job_task.good_job", { result: output, error: thread_error, time: time })
create_thread if output
+ end
+
+ class ThreadPoolExecutor < Concurrent::ThreadPoolExecutor
+ # https://github.com/ruby-concurrency/concurrent-ruby/issues/684#issuecomment-427594437
+ def ready_worker_count
+ synchronize do
+ workers_still_to_be_created = @max_length - @pool.length
+ workers_created_but_waiting = @ready.length
+
+ workers_still_to_be_created + workers_created_but_waiting
+ end
+ end
end
end
end