require 'rubygems'
require 'resque'
require 'resque_scheduler/version'
require 'resque_scheduler/util'
require 'resque/scheduler'
require 'resque_scheduler/plugin'

module ResqueScheduler
  autoload :Cli, 'resque_scheduler/cli'

  #
  # Accepts a new schedule configuration of the form:
  #
  #   {
  #     "MakeTea" => {
  #       "every" => "1m" },
  #     "some_name" => {
  #       "cron"        => "5/* * * *",
  #       "class"       => "DoSomeWork",
  #       "args"        => "work on this string",
  #       "description" => "this thing works it"s butter off" },
  #     ...
  #   }
  #
  # Hash keys can be anything and are used to describe and reference
  # the scheduled job. If the "class" argument is missing, the key
  # is used implicitly as "class" argument - in the "MakeTea" example,
  # "MakeTea" is used both as job name and resque worker class.
  #
  # Any jobs that were in the old schedule, but are not
  # present in the new schedule, will be removed.
  #
  # :cron can be any cron scheduling string
  #
  # :every can be used in lieu of :cron. see rufus-scheduler's 'every' usage
  # for valid syntax. If :cron is present it will take precedence over :every.
  #
  # :class must be a resque worker class. If it is missing, the job name (hash key)
  # will be used as :class.
  #
  # :args can be any yaml which will be converted to a ruby literal and
  # passed in a params. (optional)
  #
  # :rails_envs is the list of envs where the job gets loaded. Envs are
  # comma separated (optional)
  #
  # :description is just that, a description of the job (optional). If
  # params is an array, each element in the array is passed as a separate
  # param, otherwise params is passed in as the only parameter to perform.
  def schedule=(schedule_hash)
    # clean the schedules as it exists in redis
    clean_schedules

    schedule_hash = prepare_schedule(schedule_hash)

    # store all schedules in redis, so we can retrieve them back everywhere.
    schedule_hash.each do |name, job_spec|
      set_schedule(name, job_spec)
    end

    # ensure only return the successfully saved data!
    reload_schedule!
  end

  # Returns the schedule hash
  def schedule
    @schedule ||= get_schedules
    if @schedule.nil?
      return {}
    end
    @schedule
  end

  # reloads the schedule from redis
  def reload_schedule!
    @schedule = get_schedules
  end

  # gets the schedules as it exists in redis
  def get_schedules
    unless redis.exists(:schedules)
      return nil
    end

    redis.hgetall(:schedules).tap do |h|
      h.each do |name, config|
        h[name] = decode(config)
      end
    end
  end

  # clean the schedules as it exists in redis, useful for first setup?
  def clean_schedules
    if redis.exists(:schedules)
      redis.hkeys(:schedules).each do |key|
        remove_schedule(key) if !schedule_persisted?(key)
      end
    end
    @schedule = nil
    true
  end

  # Create or update a schedule with the provided name and configuration.
  #
  # Note: values for class and custom_job_class need to be strings,
  # not constants.
  #
  #    Resque.set_schedule('some_job', {:class => 'SomeJob',
  #                                     :every => '15mins',
  #                                     :queue => 'high',
  #                                     :args => '/tmp/poop'})
  def set_schedule(name, config)
    existing_config = get_schedule(name)
    persist = config.delete(:persist) || config.delete('persist')
    unless existing_config && existing_config == config
      redis.pipelined do
        redis.hset(:schedules, name, encode(config))
        redis.sadd(:schedules_changed, name)
        if persist
          redis.sadd(:persisted_schedules, name)
        end
      end
    end
    config
  end

  # retrive the schedule configuration for the given name
  def get_schedule(name)
    decode(redis.hget(:schedules, name))
  end

  def schedule_persisted?(name)
    redis.sismember(:persisted_schedules, name)
  end

  # remove a given schedule by name
  def remove_schedule(name)
    redis.pipelined do
      redis.hdel(:schedules, name)
      redis.srem(:persisted_schedules, name)
      redis.sadd(:schedules_changed, name)
    end
  end

  # This method is nearly identical to +enqueue+ only it also
  # takes a timestamp which will be used to schedule the job
  # for queueing.  Until timestamp is in the past, the job will
  # sit in the schedule list.
  def enqueue_at(timestamp, klass, *args)
    validate(klass)
    enqueue_at_with_queue(queue_from_class(klass), timestamp, klass, *args)
  end

  # Identical to +enqueue_at+, except you can also specify
  # a queue in which the job will be placed after the
  # timestamp has passed. It respects Resque.inline option, by
  # creating the job right away instead of adding to the queue.
  def enqueue_at_with_queue(queue, timestamp, klass, *args)
    return false unless Plugin.run_before_schedule_hooks(klass, *args)

    if Resque.inline? || timestamp.to_i < Time.now.to_i
      # Just create the job and let resque perform it right away with inline.
      # If the class is a custom job class, call self#scheduled on it. This allows you to do things like
      # Resque.enqueue_at(timestamp, CustomJobClass, :opt1 => val1). Otherwise, pass off to Resque.
      if klass.respond_to?(:scheduled)
        klass.scheduled(queue, klass.to_s(), *args)
      else
        Resque::Job.create(queue, klass, *args)
      end
    else
      delayed_push(timestamp, job_to_hash_with_queue(queue, klass, args))
    end

    Plugin.run_after_schedule_hooks(klass, *args)
  end

  # Identical to enqueue_at but takes number_of_seconds_from_now
  # instead of a timestamp.
  def enqueue_in(number_of_seconds_from_now, klass, *args)
    enqueue_at(Time.now + number_of_seconds_from_now, klass, *args)
  end

  # Identical to +enqueue_in+, except you can also specify
  # a queue in which the job will be placed after the
  # number of seconds has passed.
  def enqueue_in_with_queue(queue, number_of_seconds_from_now, klass, *args)
    enqueue_at_with_queue(queue, Time.now + number_of_seconds_from_now, klass, *args)
  end

  # Used internally to stuff the item into the schedule sorted list.
  # +timestamp+ can be either in seconds or a datetime object
  # Insertion if O(log(n)).
  # Returns true if it's the first job to be scheduled at that time, else false
  def delayed_push(timestamp, item)
    # First add this item to the list for this timestamp
    redis.rpush("delayed:#{timestamp.to_i}", encode(item))

    # Store the timestamps at with this item occurs
    redis.sadd("timestamps:#{encode(item)}", "delayed:#{timestamp.to_i}")

    # Now, add this timestamp to the zsets.  The score and the value are
    # the same since we'll be querying by timestamp, and we don't have
    # anything else to store.
    redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i
  end

  # Returns an array of timestamps based on start and count
  def delayed_queue_peek(start, count)
    Array(redis.zrange(:delayed_queue_schedule, start, start+count-1)).collect { |x| x.to_i }
  end

  # Returns the size of the delayed queue schedule
  def delayed_queue_schedule_size
    redis.zcard :delayed_queue_schedule
  end

  # Returns the number of jobs for a given timestamp in the delayed queue schedule
  def delayed_timestamp_size(timestamp)
    redis.llen("delayed:#{timestamp.to_i}").to_i
  end

  # Returns an array of delayed items for the given timestamp
  def delayed_timestamp_peek(timestamp, start, count)
    if 1 == count
      r = list_range "delayed:#{timestamp.to_i}", start, count
      r.nil? ? [] : [r]
    else
      list_range "delayed:#{timestamp.to_i}", start, count
    end
  end

  # Returns the next delayed queue timestamp
  # (don't call directly)
  def next_delayed_timestamp(at_time=nil)
    items = redis.zrangebyscore :delayed_queue_schedule, '-inf', (at_time || Time.now).to_i, :limit => [0, 1]
    timestamp = items.nil? ? nil : Array(items).first
    timestamp.to_i unless timestamp.nil?
  end

  # Returns the next item to be processed for a given timestamp, nil if
  # done. (don't call directly)
  # +timestamp+ can either be in seconds or a datetime
  def next_item_for_timestamp(timestamp)
    key = "delayed:#{timestamp.to_i}"

    encoded_item = redis.lpop(key)
    redis.srem("timestamps:#{encoded_item}", key)
    item = decode(encoded_item)

    # If the list is empty, remove it.
    clean_up_timestamp(key, timestamp)
    item
  end

  # Clears all jobs created with enqueue_at or enqueue_in
  def reset_delayed_queue
    Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |item|
      key = "delayed:#{item}"
      items = redis.lrange(key, 0, -1)
      redis.pipelined do
        items.each { |ts_item| redis.del("timestamps:#{ts_item}") }
      end
      redis.del key
    end

    redis.del :delayed_queue_schedule
  end

  # Given an encoded item, remove it from the delayed_queue
  def remove_delayed(klass, *args)
    search = encode(job_to_hash(klass, args))
    timestamps = redis.smembers("timestamps:#{search}")

    replies = redis.pipelined do
      timestamps.each do |key|
        redis.lrem(key, 0, search)
        redis.srem("timestamps:#{search}", key)
      end
    end

    (replies.nil? || replies.empty?) ? 0 : replies.each_slice(2).collect { |slice| slice.first }.inject(:+)
  end

  # Given an encoded item, enqueue it now
  def enqueue_delayed(klass, *args)
    hash = job_to_hash(klass, args)
    remove_delayed(klass, *args).times { Resque::Scheduler.enqueue_from_config(hash) }
  end

  # Given a block, remove jobs that return true from a block
  #
  # This allows for removal of delayed jobs that have arguments matching certain criteria
  def remove_delayed_selection
    fail ArgumentError, "Please supply a block" unless block_given?

    destroyed = 0
    # There is no way to search Redis list entries for a partial match, so we query for all
    # delayed job tasks and do our matching after decoding the payload data
    jobs = Resque.redis.keys("delayed:*")
    jobs.each do |job|
      index = Resque.redis.llen(job) - 1
      while index >= 0
        payload = Resque.redis.lindex(job, index)
        decoded_payload = decode(payload)
        if yield(decoded_payload['args'])
          removed = redis.lrem job, 0, payload
          destroyed += removed
          index -= removed
        else
          index -= 1
        end
      end
    end
    destroyed
  end

  # Given a timestamp and job (klass + args) it removes all instances and
  # returns the count of jobs removed.
  #
  # O(N) where N is the number of jobs scheduled to fire at the given
  # timestamp
  def remove_delayed_job_from_timestamp(timestamp, klass, *args)
    key = "delayed:#{timestamp.to_i}"
    encoded_job = encode(job_to_hash(klass, args))

    redis.srem("timestamps:#{encoded_job}", key)
    count = redis.lrem(key, 0, encoded_job)
    clean_up_timestamp(key, timestamp)

    count
  end

  def count_all_scheduled_jobs
    total_jobs = 0
    Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |timestamp|
      total_jobs += redis.llen("delayed:#{timestamp}").to_i
    end
    total_jobs
  end

  # Returns delayed jobs schedule timestamp for +klass+, +args+.
  def scheduled_at(klass, *args)
    search = encode(job_to_hash(klass, args))
    redis.smembers("timestamps:#{search}").collect do |key|
      key.tr('delayed:', '').to_i
    end
  end

  private

  def job_to_hash(klass, args)
    {:class => klass.to_s, :args => args, :queue => queue_from_class(klass)}
  end

  def job_to_hash_with_queue(queue, klass, args)
    {:class => klass.to_s, :args => args, :queue => queue}
  end

  def clean_up_timestamp(key, timestamp)
    # If the list is empty, remove it.

    # Use a watch here to ensure nobody adds jobs to this delayed
    # queue while we're removing it.
    redis.watch key
    if 0 == redis.llen(key).to_i
      redis.multi do
        redis.del key
        redis.zrem :delayed_queue_schedule, timestamp.to_i
      end
    else
      redis.unwatch
    end
  end

  def prepare_schedule(schedule_hash)
    prepared_hash = {}
    schedule_hash.each do |name, job_spec|
      job_spec = job_spec.dup
      job_spec['class'] = name unless job_spec.key?('class') || job_spec.key?(:class)
      prepared_hash[name] = job_spec
    end
    prepared_hash
  end
end

Resque.extend ResqueScheduler