module CanvasSync
  module JobBatches
    class Pool
      include RedisModel

      HINCR_MAX = RedisScript.new(Pathname.new(__FILE__) + "../hincr_max.lua")

      attr_reader :pid
      redis_attr :description
      redis_attr :created_at
      redis_attr :concurrency, :int
      redis_attr :order
      redis_attr :on_failed_job, :symbol
      redis_attr :clean_when_empty, :bool

      def initialize(pooolid = nil, **kwargs)
        if pooolid
          @existing = true
          @pid = pooolid
        else
          @pid = SecureRandom.urlsafe_base64(10)
          initialize_new(**kwargs)
        end
      end

      def self.from_pid(pid)
        raise "PID must be given" unless pid.present?
        new(pid)
      end

      def <<(job_desc)
        add_job(job_desc)
      end

      def add_job(job_desc)
        add_jobs([job_desc])
      end

      def add_jobs(job_descs)
        job_descs.each do |job_desc|
          wrapper = Batch.new
          wrapper.description = "Pool Job Wrapper (PID: #{pid})"
          checkin_event = (on_failed_job == :wait) ? :success : :complete
          wrapper.on(checkin_event, "#{self.class.to_s}.job_checked_in", pool_id: pid)
          wrapper.jobs {}

          job_desc = job_desc.with_indifferent_access
          job_desc = job_desc.merge!(
            job: job_desc[:job].to_s,
            pool_wrapper_batch: wrapper.bid,
          )

          push_job_to_pool(job_desc)
        end
        refill_allotment
      end

      def keep_open!
        if block_given?
          begin
            keep_open!
            yield
          ensure
            let_close!
          end
        else
          redis do |r|
            r.hset(redis_key, 'keep_open', true)
          end
        end
      end

      def let_close!
        _, active_count = redis do |r|
          r.multi do
            r.hset(redis_key, 'keep_open', false)
            r.hincrby(redis_key, "active_count", 0)
          end
        end

        if active_count == 0 && pending_count == 0
          cleanup_redis if clean_when_empty
        end
      end

      def cleanup_redis
        Batch.logger.debug {"Cleaning redis of pool #{pid}"}
        redis do |r|
          r.zrem("pools", pid)
          r.unlink(
            "#{redis_key}",
            "#{redis_key}-jobs",
          )
        end
      end

      def active_count
        redis do |r|
          r.hincrby(redis_key, "active_count", 0)
        end
      end

      def pending_count
        jobs_key = "#{redis_key}-jobs"
        order = self.order || 'fifo'
        redis do |r|
          case order.to_sym
          when :fifo, :lifo
            r.llen(jobs_key)
          when :random
            r.scard(jobs_key)
          when :priority
            r.zcard(jobs_key)
          end
        end
      end

      def job_checked_in(status, options)
        active_count = redis do |r|
          return unless r.exists?(redis_key)
          r.hincrby(redis_key, "active_count", -1)
        end

        added_count = refill_allotment
        if active_count == 0 && added_count == 0
          if clean_when_empty && redis {|r| r.hget(redis_key, 'keep_open') } != 'true'
            cleanup_redis
          end
        end
      end

      def self.job_checked_in(status, options)
        pid = options['pool_id']
        from_pid(pid).job_checked_in(status, options)
      end

      protected

      def redis_key
        "POOLID-#{pid}"
      end

      def refill_allotment
        jobs_added = 0
        limit = concurrency.to_i
        redis do |r|
          current_count = 0
          while true
            current_count = HINCR_MAX.call(r, [redis_key], ["active_count", limit]).to_i
            if current_count < limit
              job_desc = pop_job_from_pool
              if job_desc.present?
                Batch.new(job_desc['pool_wrapper_batch']).jobs do
                  ChainBuilder.enqueue_job(job_desc)
                end
                jobs_added += 1
              else
                r.hincrby(redis_key, "active_count", -1)
                break
              end
            else
              break
            end
          end
          r.expire(redis_key, Batch::BID_EXPIRE_TTL)
          r.expire("#{redis_key}-jobs", Batch::BID_EXPIRE_TTL)
        end
        jobs_added
      end

      def push_job_to_pool(job_desc)
        jobs_key = "#{redis_key}-jobs"
        # This allows duplicate jobs when a Redis Set is used
        job_desc['_pool_random_key_'] = SecureRandom.urlsafe_base64(10)
        job_json = JSON.unparse(ActiveJob::Arguments.serialize([job_desc]))
        order = self.order

        redis do |r|
          r.multi do
            case order.to_sym
            when :fifo, :lifo
              r.rpush(jobs_key, job_json)
            when :random
              r.sadd(jobs_key, job_json)
            when :priority
              r.zadd(jobs_key, job_desc[:priority] || 0, job_json)
            end
            r.expire(jobs_key, Batch::BID_EXPIRE_TTL)
          end
        end
      end

      def pop_job_from_pool
        jobs_key = "#{redis_key}-jobs"
        order = self.order

        job_json = nil
        redis do |r|
          job_json = case order.to_sym
          when :fifo
            r.lpop(jobs_key)
          when :lifo
            r.rpop(jobs_key)
          when :random
            r.spop(jobs_key)
          when :priority
            r.zpopmax(jobs_key)
          end
        end

        return nil unless job_json.present?

        ActiveJob::Arguments.deserialize(JSON.parse(job_json))[0]
      end

      def self.redis(&blk)
        Batch.redis &blk
      end
      delegate :redis, to: :class

      protected

      def flush_pending_attrs
        super
        redis do |r|
          r.zadd("pools", created_at, pid)
        end
      end

      private

      def initialize_new(concurrency: nil, order: :fifo, clean_when_empty: true, on_failed_job: :wait, description: nil)
        self.created_at = Time.now.utc.to_f
        self.description = description
        self.order = order
        self.concurrency = concurrency
        self.clean_when_empty = clean_when_empty
        self.on_failed_job = on_failed_job
        flush_pending_attrs
      end
    end
  end
end