lib/good_job/scheduler.rb in good_job-0.2.0 vs lib/good_job/scheduler.rb in good_job-0.2.1
- old
+ new
@@ -7,24 +7,24 @@
MAX_THREADS = Concurrent.processor_count
DEFAULT_TIMER_OPTIONS = {
execution_interval: 1,
timeout_interval: 1,
- run_now: true
+ run_now: true,
}.freeze
DEFAULT_POOL_OPTIONS = {
name: 'good_job',
min_threads: 0,
max_threads: MAX_THREADS,
auto_terminate: true,
idletime: 0,
max_queue: 0,
- fallback_policy: :abort # shouldn't matter -- 0 max queue
+ fallback_policy: :abort, # shouldn't matter -- 0 max queue
}.freeze
- def initialize(query = GoodJob::Job.all, **options)
+ def initialize(query = GoodJob::Job.all, **_options)
@query = query
@pool = Concurrent::ThreadPoolExecutor.new(DEFAULT_POOL_OPTIONS)
@timer = Concurrent::TimerTask.new(DEFAULT_TIMER_OPTIONS) do
idle_threads = @pool.max_length - @pool.length
@@ -49,22 +49,28 @@
if @pool.running?
@pool.shutdown
@pool.wait_for_termination if wait
end
+
+ true
end
def create_thread
future = Concurrent::Future.new(args: [ordered_query], executor: @pool) do |query|
Rails.application.executor.wrap do
- while good_job = query.with_advisory_lock.first
+ loop do
+ good_job = query.with_advisory_lock.first
+ break unless good_job
+
ActiveSupport::Notifications.instrument("job_started.good_job", { good_job: good_job })
JobWrapper.new(good_job).perform
good_job.advisory_unlock
end
end
+
true
end
future.add_observer(TaskObserver.new)
future.execute
end