# frozen_string_literal: true

class Redis
  class Pipeline
    attr_accessor :db
    attr_reader :client

    attr :futures

    def initialize(client)
      @client = client.is_a?(Pipeline) ? client.client : client
      @with_reconnect = true
      @shutdown = false
      @futures = []
    end

    def timeout
      client.timeout
    end

    def with_reconnect?
      @with_reconnect
    end

    def without_reconnect?
      !@with_reconnect
    end

    def shutdown?
      @shutdown
    end

    def empty?
      @futures.empty?
    end

    def call(command, timeout: nil, &block)
      # A pipeline that contains a shutdown should not raise ECONNRESET when
      # the connection is gone.
      @shutdown = true if command.first == :shutdown
      future = Future.new(command, block, timeout)
      @futures << future
      future
    end

    def call_with_timeout(command, timeout, &block)
      call(command, timeout: timeout, &block)
    end

    def call_pipeline(pipeline)
      @shutdown = true if pipeline.shutdown?
      @futures.concat(pipeline.futures)
      @db = pipeline.db
      nil
    end

    def commands
      @futures.map(&:_command)
    end

    def timeouts
      @futures.map(&:timeout)
    end

    def with_reconnect(val = true)
      @with_reconnect = false unless val
      yield
    end

    def without_reconnect(&blk)
      with_reconnect(false, &blk)
    end

    def finish(replies, &blk)
      if blk
        futures.each_with_index.map do |future, i|
          future._set(blk.call(replies[i]))
        end
      else
        futures.each_with_index.map do |future, i|
          future._set(replies[i])
        end
      end
    end

    class Multi < self
      def finish(replies)
        exec = replies.last

        return if exec.nil? # The transaction failed because of WATCH.

        # EXEC command failed.
        raise exec if exec.is_a?(CommandError)

        if exec.size < futures.size
          # Some command wasn't recognized by Redis.
          command_error = replies.detect { |r| r.is_a?(CommandError) }
          raise command_error
        end

        super(exec) do |reply|
          # Because an EXEC returns nested replies, hiredis won't be able to
          # convert an error reply to a CommandError instance itself. This is
          # specific to MULTI/EXEC, so we solve this here.
          reply.is_a?(::RuntimeError) ? CommandError.new(reply.message) : reply
        end
      end

      def timeouts
        if empty?
          []
        else
          [nil, *super, nil]
        end
      end

      def commands
        if empty?
          []
        else
          [[:multi]] + super + [[:exec]]
        end
      end
    end
  end

  class FutureNotReady < RuntimeError
    def initialize
      super("Value will be available once the pipeline executes.")
    end
  end

  class Future < BasicObject
    FutureNotReady = ::Redis::FutureNotReady.new

    attr_reader :timeout

    def initialize(command, transformation, timeout)
      @command = command
      @transformation = transformation
      @timeout = timeout
      @object = FutureNotReady
    end

    def ==(_other)
      message = +"The methods == and != are deprecated for Redis::Future and will be removed in 4.2.0"
      message << " - You probably meant to call .value == or .value !="
      message << " (#{::Kernel.caller(1, 1).first})\n"

      ::Kernel.warn(message)

      super
    end

    def inspect
      "<Redis::Future #{@command.inspect}>"
    end

    def _set(object)
      @object = @transformation ? @transformation.call(object) : object
      value
    end

    def _command
      @command
    end

    def value
      ::Kernel.raise(@object) if @object.is_a?(::RuntimeError)
      @object
    end

    def is_a?(other)
      self.class.ancestors.include?(other)
    end

    def class
      Future
    end
  end
end