lib/good_job/scheduler.rb in good_job-1.0.1 vs lib/good_job/scheduler.rb in good_job-1.0.2

- old
+ new

@@ -14,22 +14,21 @@ name: 'good_job', min_threads: 0, max_threads: Concurrent.processor_count, auto_terminate: true, idletime: 60, - max_queue: 0, - fallback_policy: :abort, # shouldn't matter -- 0 max queue + max_queue: -1, + fallback_policy: :discard, }.freeze def initialize(performer, timer_options: {}, pool_options: {}) raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next) @performer = performer - @pool = Concurrent::ThreadPoolExecutor.new(DEFAULT_POOL_OPTIONS.merge(pool_options)) + @pool = ThreadPoolExecutor.new(DEFAULT_POOL_OPTIONS.merge(pool_options)) @timer = Concurrent::TimerTask.new(DEFAULT_TIMER_OPTIONS.merge(timer_options)) do - idle_threads = @pool.max_length - @pool.length - create_thread if idle_threads.positive? + create_thread end @timer.add_observer(self, :timer_observer) @timer.execute end @@ -56,10 +55,12 @@ def shutdown? @_shutdown end def create_thread + return false unless @pool.ready_worker_count.positive? + future = Concurrent::Future.new(args: [@performer], executor: @pool) do |performer| output = nil Rails.application.executor.wrap { output = performer.next } output end @@ -72,8 +73,20 @@ end def task_observer(time, output, thread_error) ActiveSupport::Notifications.instrument("finished_job_task.good_job", { result: output, error: thread_error, time: time }) create_thread if output + end + + class ThreadPoolExecutor < Concurrent::ThreadPoolExecutor + # https://github.com/ruby-concurrency/concurrent-ruby/issues/684#issuecomment-427594437 + def ready_worker_count + synchronize do + workers_still_to_be_created = @max_length - @pool.length + workers_created_but_waiting = @ready.length + + workers_still_to_be_created + workers_created_but_waiting + end + end end end end