lib/good_job/scheduler.rb in good_job-0.1.0 vs lib/good_job/scheduler.rb in good_job-0.2.0
- old
+ new
@@ -2,43 +2,44 @@
require "concurrent/executor/thread_pool_executor"
require "concurrent/utility/processor_counter"
module GoodJob
class Scheduler
- MINIMUM_EXECUTION_INTERVAL = 0.1
+ MAX_THREADS = Concurrent.processor_count
DEFAULT_TIMER_OPTIONS = {
execution_interval: 1,
timeout_interval: 1,
run_now: true
}.freeze
- MAX_THREADS = Concurrent.processor_count
-
DEFAULT_POOL_OPTIONS = {
- name: 'good_job',
- min_threads: 0,
- max_threads: MAX_THREADS,
- auto_terminate: true,
- idletime: 0,
- max_queue: 0,
- fallback_policy: :abort # shouldn't matter -- 0 max queue
+ name: 'good_job',
+ min_threads: 0,
+ max_threads: MAX_THREADS,
+ auto_terminate: true,
+ idletime: 0,
+ max_queue: 0,
+ fallback_policy: :abort # shouldn't matter -- 0 max queue
}.freeze
def initialize(query = GoodJob::Job.all, **options)
@query = query
@pool = Concurrent::ThreadPoolExecutor.new(DEFAULT_POOL_OPTIONS)
@timer = Concurrent::TimerTask.new(DEFAULT_TIMER_OPTIONS) do
idle_threads = @pool.max_length - @pool.length
- puts "There are idle_threads: #{idle_threads}"
create_thread if idle_threads.positive?
- true
end
+ @timer.add_observer(TimerObserver.new)
@timer.execute
end
+ def ordered_query
+ @query.where("scheduled_at < ?", Time.current).or(@query.where(scheduled_at: nil)).order(priority: :desc)
+ end
+
def execute
end
def shutdown(wait: true)
if @timer.running?
@@ -51,39 +52,34 @@
@pool.wait_for_termination if wait
end
end
def create_thread
- future = Concurrent::Future.new(args: [@query], executor: @pool) do |query|
+ future = Concurrent::Future.new(args: [ordered_query], executor: @pool) do |query|
Rails.application.executor.wrap do
- thread_name = Thread.current.name || Thread.current.object_id
- while job = query.with_advisory_lock.first
- puts "Executing job #{job.id} in thread #{thread_name}"
+ while good_job = query.with_advisory_lock.first
+ ActiveSupport::Notifications.instrument("job_started.good_job", { good_job: good_job })
- JobWrapper.new(job).perform
+ JobWrapper.new(good_job).perform
- job.advisory_unlock
+ good_job.advisory_unlock
end
- true
end
+ true
end
- future.add_observer(TaskObserver.new(self))
+ future.add_observer(TaskObserver.new)
future.execute
end
- class TaskObserver
- def initialize(scheduler)
- @scheduler = scheduler
+ class TimerObserver
+ def update(time, result, error)
+ ActiveSupport::Notifications.instrument("timer_task_finished.good_job", { result: result, error: error, time: time })
end
+ end
- def update(time, result, ex)
- if result
- puts "(#{time}) Execution successfully returned #{result}\n"
- elsif ex.is_a?(Concurrent::TimeoutError)
- puts "(#{time}) Execution timed out\n"
- else
- puts "(#{time}) Execution failed with error #{result} #{ex}\n"
- end
+ class TaskObserver
+ def update(time, result, error)
+ ActiveSupport::Notifications.instrument("job_finished.good_job", { result: result, error: error, time: time })
end
end
end
end