lib/good_job/scheduler.rb in good_job-0.2.0 vs lib/good_job/scheduler.rb in good_job-0.2.1

- old
+ new

@@ -7,24 +7,24 @@ MAX_THREADS = Concurrent.processor_count DEFAULT_TIMER_OPTIONS = { execution_interval: 1, timeout_interval: 1, - run_now: true + run_now: true, }.freeze DEFAULT_POOL_OPTIONS = { name: 'good_job', min_threads: 0, max_threads: MAX_THREADS, auto_terminate: true, idletime: 0, max_queue: 0, - fallback_policy: :abort # shouldn't matter -- 0 max queue + fallback_policy: :abort, # shouldn't matter -- 0 max queue }.freeze - def initialize(query = GoodJob::Job.all, **options) + def initialize(query = GoodJob::Job.all, **_options) @query = query @pool = Concurrent::ThreadPoolExecutor.new(DEFAULT_POOL_OPTIONS) @timer = Concurrent::TimerTask.new(DEFAULT_TIMER_OPTIONS) do idle_threads = @pool.max_length - @pool.length @@ -49,22 +49,28 @@ if @pool.running? @pool.shutdown @pool.wait_for_termination if wait end + + true end def create_thread future = Concurrent::Future.new(args: [ordered_query], executor: @pool) do |query| Rails.application.executor.wrap do - while good_job = query.with_advisory_lock.first + loop do + good_job = query.with_advisory_lock.first + break unless good_job + ActiveSupport::Notifications.instrument("job_started.good_job", { good_job: good_job }) JobWrapper.new(good_job).perform good_job.advisory_unlock end end + true end future.add_observer(TaskObserver.new) future.execute end