# vim:fileencoding=utf-8

require 'rufus/scheduler'
require_relative 'scheduler/configuration'
require_relative 'scheduler/locking'
require_relative 'scheduler/logger_builder'
require_relative 'scheduler/signal_handling'
require_relative 'scheduler/failure_handler'

module ResqueAdmin
  module Scheduler
    autoload :Cli, 'resque-admin/scheduler/cli'
    autoload :Extension, 'resque-admin/scheduler/extension'
    autoload :Util, 'resque-admin/scheduler/util'
    autoload :VERSION, 'resque-admin/scheduler/version'

    private

    extend ResqueAdmin::Scheduler::Locking
    extend ResqueAdmin::Scheduler::Configuration
    extend ResqueAdmin::Scheduler::SignalHandling

    public

    class << self
      attr_writer :logger

      # the Rufus::Scheduler jobs that are scheduled
      attr_reader :scheduled_jobs

      # allow user to set an additional failure handler
      attr_writer :failure_handler

      # Schedule all jobs and continually look for delayed jobs (never returns)
      def run
        procline 'Starting'

        # trap signals
        register_signal_handlers

        # Quote from the resque/worker.
        # Fix buffering so we can `rake resque:scheduler > scheduler.log` and
        # get output from the child in there.
        $stdout.sync = true
        $stderr.sync = true

        # Load the schedule into rufus
        # If dynamic is set, load that schedule otherwise use normal load
        if dynamic
          reload_schedule!
        else
          load_schedule!
        end

        begin
          @th = Thread.current

          # Now start the scheduling part of the loop.
          loop do
            begin
              if master?
                handle_delayed_items
                update_schedule if dynamic
              end
            rescue Errno::EAGAIN, Errno::ECONNRESET, Redis::CannotConnectError => e
              log! e.message
              release_master_lock
            end
            poll_sleep
          end

        rescue Interrupt
          log 'Exiting'
        end
      end

      def print_schedule
        if rufus_scheduler
          log! "Scheduling Info\tLast Run"
          scheduler_jobs = rufus_scheduler.jobs
          scheduler_jobs.each do |_k, v|
            log! "#{v.t}\t#{v.last}\t"
          end
        end
      end

      # Pulls the schedule from ResqueAdmin.schedule and loads it into the
      # rufus scheduler instance
      def load_schedule!
        procline 'Loading Schedule'

        # Need to load the schedule from redis for the first time if dynamic
        ResqueAdmin.reload_schedule! if dynamic

        log! 'Schedule empty! Set ResqueAdmin.schedule' if ResqueAdmin.schedule.empty?

        @scheduled_jobs = {}

        ResqueAdmin.schedule.each do |name, config|
          load_schedule_job(name, config)
        end
        ResqueAdmin.redis.del(:schedules_changed)
        procline 'Schedules Loaded'
      end

      # modify interval type value to value with options if options available
      def optionizate_interval_value(value)
        args = value
        if args.is_a?(::Array)
          return args.first if args.size > 2 || !args.last.is_a?(::Hash)
          # symbolize keys of hash for options
          args[2] = args[1].reduce({}) do |m, i|
            key, value = i
            m[(key.respond_to?(:to_sym) ? key.to_sym : key) || key] = value
            m
          end

          args[2][:job] = true
          args[1] = nil
        end
        args
      end

      # Loads a job schedule into the Rufus::Scheduler and stores it
      # in @scheduled_jobs
      def load_schedule_job(name, config)
        # If `rails_env` or `env` is set in the config, load jobs only if they
        # are meant to be loaded in `ResqueAdmin::Scheduler.env`.  If `rails_env` or
        # `env` is missing, the job should be scheduled regardless of the value
        # of `ResqueAdmin::Scheduler.env`.

        configured_env = config['rails_env'] || config['env']

        if configured_env.nil? || env_matches?(configured_env)
          log! "Scheduling #{name} "
          interval_defined = false
          interval_types = %w(cron every)
          interval_types.each do |interval_type|
            next unless !config[interval_type].nil? && !config[interval_type].empty?
            args = optionizate_interval_value(config[interval_type])
            args = [args, nil, job: true] if args.is_a?(::String)

            job = rufus_scheduler.send(interval_type, *args) do
              enqueue_recurring(name, config)
            end
            @scheduled_jobs[name] = job
            interval_defined = true
            break
          end
          unless interval_defined
            log! "no #{interval_types.join(' / ')} found for " \
                 "#{config['class']} (#{name}) - skipping"
          end
        else
          log "Skipping schedule of #{name} because configured " \
              "env #{configured_env.inspect} does not match current " \
              "env #{env.inspect}"
        end
      end

      # Returns true if the given schedule config hash matches the current env
      def rails_env_matches?(config)
        warn '`ResqueAdmin::Scheduler.rails_env_matches?` is deprecated. ' \
             'Please use `ResqueAdmin::Scheduler.env_matches?` instead.'
        config['rails_env'] && env &&
          config['rails_env'].split(/[\s,]+/).include?(env)
      end

      # Returns true if the current env is non-nil and the configured env
      # (which is a comma-split string) includes the current env.
      def env_matches?(configured_env)
        env && configured_env.split(/[\s,]+/).include?(env)
      end

      # Handles queueing delayed items
      # at_time - Time to start scheduling items (default: now).
      def handle_delayed_items(at_time = nil)
        timestamp = ResqueAdmin.next_delayed_timestamp(at_time)
        if timestamp
          procline 'Processing Delayed Items'
          until timestamp.nil?
            enqueue_delayed_items_for_timestamp(timestamp)
            timestamp = ResqueAdmin.next_delayed_timestamp(at_time)
          end
        end
      end

      def enqueue_next_item(timestamp)
        item = ResqueAdmin.next_item_for_timestamp(timestamp)

        if item
          log "queuing #{item['class']} [delayed]"
          enqueue(item)
        end

        item
      end

      # Enqueues all delayed jobs for a timestamp
      def enqueue_delayed_items_for_timestamp(timestamp)
        item = nil
        loop do
          handle_shutdown do
            # Continually check that it is still the master
            item = enqueue_next_item(timestamp) if master?
          end
          # continue processing until there are no more ready items in this
          # timestamp
          break if item.nil?
        end
      end

      def enqueue(config)
        enqueue_from_config(config)
      rescue => e
        ResqueAdmin::Scheduler.failure_handler.on_enqueue_failure(config, e)
      end

      def handle_shutdown
        exit if @shutdown
        yield
        exit if @shutdown
      end

      # Enqueues a job based on a config hash
      def enqueue_from_config(job_config)
        args = job_config['args'] || job_config[:args]

        klass_name = job_config['class'] || job_config[:class]
        begin
          klass = ResqueAdmin::Scheduler::Util.constantize(klass_name)
        rescue NameError
          klass = klass_name
        end

        params = args.is_a?(Hash) ? [args] : Array(args)
        queue = job_config['queue'] ||
                job_config[:queue] ||
                ResqueAdmin.queue_from_class(klass)
        # Support custom job classes like those that inherit from
        # ResqueAdmin::JobWithStatus (resque-status)
        job_klass = job_config['custom_job_class']
        if job_klass && job_klass != 'ResqueAdmin::Job'
          # The custom job class API must offer a static "scheduled" method. If
          # the custom job class can not be constantized (via a requeue call
          # from the web perhaps), fall back to enqueing normally via
          # ResqueAdmin::Job.create.
          begin
            ResqueAdmin::Scheduler::Util.constantize(job_klass).scheduled(
              queue, klass_name, *params
            )
          rescue NameError
            # Note that the custom job class (job_config['custom_job_class'])
            # is the one enqueued
            ResqueAdmin::Job.create(queue, job_klass, *params)
          end
        else
          # Hack to avoid havoc for people shoving stuff into queues
          # for non-existent classes (for example: running scheduler in
          # one app that schedules for another.
          if Class === klass
            ResqueAdmin::Scheduler::Plugin.run_before_delayed_enqueue_hooks(
              klass, *params
            )

            # If the class is a custom job class, call self#scheduled on it.
            # This allows you to do things like ResqueAdmin.enqueue_at(timestamp,
            # CustomJobClass). Otherwise, pass off to ResqueAdmin.
            if klass.respond_to?(:scheduled)
              klass.scheduled(queue, klass_name, *params)
            else
              ResqueAdmin.enqueue_to(queue, klass, *params)
            end
          else
            # This will not run the before_hooks in rescue, but will at least
            # queue the job.
            ResqueAdmin::Job.create(queue, klass, *params)
          end
        end
      end

      def rufus_scheduler
        @rufus_scheduler ||= Rufus::Scheduler.new
      end

      # Stops old rufus scheduler and creates a new one.  Returns the new
      # rufus scheduler
      def clear_schedule!
        rufus_scheduler.stop
        @rufus_scheduler = nil
        @scheduled_jobs = {}
        rufus_scheduler
      end

      def reload_schedule!
        procline 'Reloading Schedule'
        clear_schedule!
        load_schedule!
      end

      def update_schedule
        if ResqueAdmin.redis.scard(:schedules_changed) > 0
          procline 'Updating schedule'
          loop do
            schedule_name = ResqueAdmin.redis.spop(:schedules_changed)
            break unless schedule_name
            ResqueAdmin.reload_schedule!
            if ResqueAdmin.schedule.keys.include?(schedule_name)
              unschedule_job(schedule_name)
              load_schedule_job(schedule_name, ResqueAdmin.schedule[schedule_name])
            else
              unschedule_job(schedule_name)
            end
          end
          procline 'Schedules Loaded'
        end
      end

      def unschedule_job(name)
        if scheduled_jobs[name]
          log "Removing schedule #{name}"
          scheduled_jobs[name].unschedule
          @scheduled_jobs.delete(name)
        end
      end

      # Sleeps and returns true
      def poll_sleep
        handle_shutdown do
          begin
            poll_sleep_loop
          ensure
            @sleeping = false
          end
        end
        true
      end

      def poll_sleep_loop
        @sleeping = true
        if poll_sleep_amount > 0
          start = Time.now
          loop do
            elapsed_sleep = (Time.now - start)
            remaining_sleep = poll_sleep_amount - elapsed_sleep
            @do_break = false
            if remaining_sleep <= 0
              @do_break = true
            else
              @do_break = handle_signals_with_operation do
                sleep(remaining_sleep)
              end
            end
            break if @do_break
          end
        else
          handle_signals_with_operation
        end
      end

      def handle_signals_with_operation
        yield if block_given?
        handle_signals
        false
      rescue Interrupt
        before_shutdown if @shutdown
        true
      end

      def stop_rufus_scheduler
        rufus_scheduler.shutdown(:wait)
        rufus_scheduler.join
      end

      def before_shutdown
        stop_rufus_scheduler
        release_master_lock
      end

      # Sets the shutdown flag, clean schedules and exits if sleeping
      def shutdown
        return if @shutdown
        @shutdown = true
        log!('Shutting down')
        @th.raise Interrupt if @sleeping
      end

      def log!(msg)
        logger.info { msg }
      end

      def log_error(msg)
        logger.error { msg }
      end

      def log(msg)
        logger.debug { msg }
      end

      def procline(string)
        log! string
        argv0 = build_procline(string)
        log "Setting procline #{argv0.inspect}"
        $0 = argv0
      end

      def failure_handler
        @failure_handler ||= ResqueAdmin::Scheduler::FailureHandler
      end

      def logger
        @logger ||= ResqueAdmin::Scheduler::LoggerBuilder.new(
          quiet: quiet,
          verbose: verbose,
          log_dev: logfile,
          format: logformat
        ).build
      end

      private

      def enqueue_recurring(name, config)
        if master?
          log! "queueing #{config['class']} (#{name})"
          ResqueAdmin.last_enqueued_at(name, Time.now.to_s)
          enqueue(config)
        end
      end

      def app_str
        app_name ? "[#{app_name}]" : ''
      end

      def env_str
        env ? "[#{env}]" : ''
      end

      def build_procline(string)
        "#{internal_name}#{app_str}#{env_str}: #{string}"
      end

      def internal_name
        "resque-scheduler-#{ResqueAdmin::Scheduler::VERSION}"
      end
    end
  end
end