lib/good_job/scheduler.rb in good_job-0.1.0 vs lib/good_job/scheduler.rb in good_job-0.2.0

- old
+ new

@@ -2,43 +2,44 @@ require "concurrent/executor/thread_pool_executor" require "concurrent/utility/processor_counter" module GoodJob class Scheduler - MINIMUM_EXECUTION_INTERVAL = 0.1 + MAX_THREADS = Concurrent.processor_count DEFAULT_TIMER_OPTIONS = { execution_interval: 1, timeout_interval: 1, run_now: true }.freeze - MAX_THREADS = Concurrent.processor_count - DEFAULT_POOL_OPTIONS = { - name: 'good_job', - min_threads: 0, - max_threads: MAX_THREADS, - auto_terminate: true, - idletime: 0, - max_queue: 0, - fallback_policy: :abort # shouldn't matter -- 0 max queue + name: 'good_job', + min_threads: 0, + max_threads: MAX_THREADS, + auto_terminate: true, + idletime: 0, + max_queue: 0, + fallback_policy: :abort # shouldn't matter -- 0 max queue }.freeze def initialize(query = GoodJob::Job.all, **options) @query = query @pool = Concurrent::ThreadPoolExecutor.new(DEFAULT_POOL_OPTIONS) @timer = Concurrent::TimerTask.new(DEFAULT_TIMER_OPTIONS) do idle_threads = @pool.max_length - @pool.length - puts "There are idle_threads: #{idle_threads}" create_thread if idle_threads.positive? - true end + @timer.add_observer(TimerObserver.new) @timer.execute end + def ordered_query + @query.where("scheduled_at < ?", Time.current).or(@query.where(scheduled_at: nil)).order(priority: :desc) + end + def execute end def shutdown(wait: true) if @timer.running? @@ -51,39 +52,34 @@ @pool.wait_for_termination if wait end end def create_thread - future = Concurrent::Future.new(args: [@query], executor: @pool) do |query| + future = Concurrent::Future.new(args: [ordered_query], executor: @pool) do |query| Rails.application.executor.wrap do - thread_name = Thread.current.name || Thread.current.object_id - while job = query.with_advisory_lock.first - puts "Executing job #{job.id} in thread #{thread_name}" + while good_job = query.with_advisory_lock.first + ActiveSupport::Notifications.instrument("job_started.good_job", { good_job: good_job }) - JobWrapper.new(job).perform + JobWrapper.new(good_job).perform - job.advisory_unlock + good_job.advisory_unlock end - true end + true end - future.add_observer(TaskObserver.new(self)) + future.add_observer(TaskObserver.new) future.execute end - class TaskObserver - def initialize(scheduler) - @scheduler = scheduler + class TimerObserver + def update(time, result, error) + ActiveSupport::Notifications.instrument("timer_task_finished.good_job", { result: result, error: error, time: time }) end + end - def update(time, result, ex) - if result - puts "(#{time}) Execution successfully returned #{result}\n" - elsif ex.is_a?(Concurrent::TimeoutError) - puts "(#{time}) Execution timed out\n" - else - puts "(#{time}) Execution failed with error #{result} #{ex}\n" - end + class TaskObserver + def update(time, result, error) + ActiveSupport::Notifications.instrument("job_finished.good_job", { result: result, error: error, time: time }) end end end end