require "concurrent/executor/thread_pool_executor"
require "concurrent/timer_task"
require "concurrent/utility/processor_counter"

module GoodJob
  class Scheduler
    DEFAULT_TIMER_OPTIONS = {
      execution_interval: 1,
      timeout_interval: 1,
      run_now: true,
    }.freeze

    DEFAULT_POOL_OPTIONS = {
      name: 'good_job',
      min_threads: 0,
      max_threads: Concurrent.processor_count,
      auto_terminate: true,
      idletime: 60,
      max_queue: -1,
      fallback_policy: :discard,
    }.freeze

    cattr_reader :instances, default: [], instance_reader: false

    def initialize(performer, timer_options: {}, pool_options: {})
      raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next)

      self.class.instances << self

      @performer = performer
      @pool_options = DEFAULT_POOL_OPTIONS.merge(pool_options)
      @timer_options = DEFAULT_TIMER_OPTIONS.merge(timer_options)

      create_pools
    end

    def shutdown(wait: true)
      @_shutdown = true

      ActiveSupport::Notifications.instrument("scheduler_shutdown_start.good_job", { wait: wait, process_id: process_id })
      ActiveSupport::Notifications.instrument("scheduler_shutdown.good_job", { wait: wait, process_id: process_id }) do
        if @timer&.running?
          @timer.shutdown
          @timer.wait_for_termination if wait
        end

        if @pool&.running?
          @pool.shutdown
          @pool.wait_for_termination if wait
        end
      end
    end

    def shutdown?
      @_shutdown
    end

    def restart(wait: true)
      ActiveSupport::Notifications.instrument("scheduler_restart_pools.good_job", { process_id: process_id }) do
        shutdown(wait: wait) unless shutdown?
        create_pools
      end
    end

    def create_thread
      return false unless @pool.ready_worker_count.positive?

      future = Concurrent::Future.new(args: [@performer], executor: @pool) do |performer|
        output = nil
        Rails.application.executor.wrap { output = performer.next }
        output
      end
      future.add_observer(self, :task_observer)
      future.execute
    end

    def timer_observer(time, executed_task, thread_error)
      GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call)
      ActiveSupport::Notifications.instrument("finished_timer_task.good_job", { result: executed_task, error: thread_error, time: time })
    end

    def task_observer(time, output, thread_error)
      GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call)
      ActiveSupport::Notifications.instrument("finished_job_task.good_job", { result: output, error: thread_error, time: time })
      create_thread if output
    end

    class ThreadPoolExecutor < Concurrent::ThreadPoolExecutor
      # https://github.com/ruby-concurrency/concurrent-ruby/issues/684#issuecomment-427594437
      def ready_worker_count
        synchronize do
          workers_still_to_be_created = @max_length - @pool.length
          workers_created_but_waiting = @ready.length

          workers_still_to_be_created + workers_created_but_waiting
        end
      end
    end

    private

    def create_pools
      ActiveSupport::Notifications.instrument("scheduler_create_pools.good_job", { performer_name: @performer.name, max_threads: @pool_options[:max_threads], poll_interval: @timer_options[:execution_interval], process_id: process_id }) do
        @pool = ThreadPoolExecutor.new(@pool_options)
        next unless @timer_options[:execution_interval].positive?

        @timer = Concurrent::TimerTask.new(@timer_options) { create_thread }
        @timer.add_observer(self, :timer_observer)
        @timer.execute
      end
    end

    def process_id
      Process.pid
    end

    def thread_name
      Thread.current.name || Thread.current.object_id
    end
  end
end