# encoding: UTF-8

# Redis client based on RubyRedis, original work of Salvatore Sanfilippo
# http://github.com/antirez/redis/blob/4a327b4af9885d89b5860548f44569d1d2bde5ab/client-libraries/ruby_2/rubyredis.rb
#
# Some improvements where inspired by the Redis-rb library, including the testing suite.
# http://github.com/ezmobius/redis-rb/
require 'socket'

module Ohm
  begin
    if (RUBY_VERSION >= '1.9')
      require 'timeout'
      RedisTimer = Timeout
    else
      require 'system_timer'
      RedisTimer = SystemTimer
    end
  rescue LoadError
    RedisTimer = nil
  end

  class Redis
    class ProtocolError < RuntimeError
      def initialize(reply_type)
        super("Protocol error, got '#{reply_type}' as initial reply byte")
      end
    end

    BULK_COMMANDS = {
      :echo => true,
      :getset => true,
      :lpush => true,
      :lrem => true,
      :lset => true,
      :rpush => true,
      :sadd => true,
      :set => true,
      :setnx => true,
      :sismember => true,
      :smove => true,
      :srem => true,
      :zadd => true,
      :zincrby => true,
      :zrem => true,
      :zscore => true
    }

    MULTI_BULK_COMMANDS = {
      :mset => true,
      :msetnx => true
    }

    PROCESSOR_IDENTITY = lambda { |reply| reply }
    PROCESSOR_CONVERT_TO_BOOL = lambda { |reply| reply == 0 ? false : reply }
    PROCESSOR_SPLIT_KEYS = lambda { |reply| reply.respond_to?(:split) ? reply.split(" ") : reply }
    PROCESSOR_INFO = lambda { |reply| Hash[*(reply.lines.map { |l| l.chomp.split(":", 2) }.flatten)] }

    REPLY_PROCESSOR = {
      :exists => PROCESSOR_CONVERT_TO_BOOL,
      :sismember=> PROCESSOR_CONVERT_TO_BOOL,
      :sadd=> PROCESSOR_CONVERT_TO_BOOL,
      :srem=> PROCESSOR_CONVERT_TO_BOOL,
      :smove=> PROCESSOR_CONVERT_TO_BOOL,
      :zadd => PROCESSOR_CONVERT_TO_BOOL,
      :zrem => PROCESSOR_CONVERT_TO_BOOL,
      :move=> PROCESSOR_CONVERT_TO_BOOL,
      :setnx=> PROCESSOR_CONVERT_TO_BOOL,
      :del=> PROCESSOR_CONVERT_TO_BOOL,
      :renamenx=> PROCESSOR_CONVERT_TO_BOOL,
      :expire=> PROCESSOR_CONVERT_TO_BOOL,
      :keys => PROCESSOR_SPLIT_KEYS,
      :info => PROCESSOR_INFO
    }

    REPLY_PROCESSOR.send(:initialize) do |hash, key|
      hash[key] = PROCESSOR_IDENTITY
    end

    def initialize(options = {})
      @host = options[:host] || '127.0.0.1'
      @port = options[:port] || 6379
      @db = options[:db] || 0
      @timeout = options[:timeout] || 0
      @password = options[:password]
      connect
    end

    def version
      @version ||= info["redis_version"]
    end

    def support_mset?
      @support_mset.nil? ?
        @support_mset = version >= "1.05" :
        @support_mset
    end

    def to_s
      "Redis Client connected to #{@host}:#{@port} against DB #{@db}"
    end

    # Shorthand for getting all the elements in a list.
    def list(key)
      call_command([:lrange, key, 0, -1])
    end

    # We need to define type because otherwise it will escape method_missing.
    def type(key)
      call_command([:type, key])
    end

    def sort(key, opts = {})
      cmd = []
      cmd << "SORT #{key}"
      cmd << "BY #{opts[:by]}" if opts[:by]
      cmd << "GET #{[opts[:get]].flatten * ' GET '}" if opts[:get]
      cmd << "#{opts[:order]}" if opts[:order]
      cmd << "LIMIT #{Array(opts[:limit]).join(' ')}" if opts[:limit]
      call_command(cmd)
    end

  private

    def connect
      connect_to(@host, @port, @timeout == 0 ? nil : @timeout)
      call_command([:auth, @password]) if @password
      call_command([:select, @db]) if @db != 0
      @sock
    end

    def connect_to(host, port, timeout = nil)

      # We support connect() timeout only if system_timer is availabe or
      # if we are running against Ruby >= 1.9. Timeout reading from the
      # socket instead will be supported anyway.
      if @timeout != 0 and RedisTimer
        begin
          @sock = TCPSocket.new(host, port)
        rescue Timeout::Error
          @sock = nil
          raise Timeout::Error, "Timeout connecting to the server"
        end
      else
        @sock = TCPSocket.new(host, port)
      end

      @sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1

      # If the timeout is set we configure the low level socket options in
      # order to make sure a blocking read will return after the specified
      # number of seconds. This hack is from the Memcached Ruby client.
      if timeout
        secs = Integer(timeout)
        usecs = Integer((timeout - secs) * 1_000_000)
        optval = [secs, usecs].pack("l_2")
        @sock.setsockopt Socket::SOL_SOCKET, Socket::SO_RCVTIMEO, optval
        @sock.setsockopt Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, optval
      end
    rescue Errno::ECONNREFUSED
      raise Errno::ECONNREFUSED, "Unable to connect to Redis on #{host}:#{port}"
    end

    def connected?
      !! @sock
    end

    def disconnect
      @sock.close
      @sock = nil
      true
    end

    def reconnect
      disconnect and connect
    end

    def method_missing(*argv)
      call_command(argv)
    end

    # Wrap raw_call_command to handle reconnection on socket error. We
    # try to reconnect just one time, otherwise let the error araise.
    def call_command(argv)
      connect unless connected?
      raw_call_command(argv.dup)
    rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNABORTED
      if reconnect
        raw_call_command(argv.dup)
      else
        raise Errno::ECONNRESET
      end
    end

    def raw_call_command(argv)
      bulk_command?(argv) ?
        process_bulk_command(argv) :
        multi_bulk_command?(argv) ?
          process_multi_bulk_command(argv) :
          process_command(argv)
      process_reply(argv[0])
    end

    def process_command(argv)
      @sock.write("#{argv.join(" ")}\r\n")
    end

    def process_bulk_command(argv)
      bulk = argv.pop.to_s
      argv.push ssize(bulk)
      @sock.write("#{argv.join(" ")}\r\n")
      @sock.write("#{bulk}\r\n")
    end

    def process_multi_bulk_command(argv)
      params = argv.pop.to_a.flatten
      params.unshift(argv[0])

      command = ["*#{params.size}"]
      params.each do |param|
        param = param.to_s
        command << "$#{ssize(param)}"
        command << param
      end

      @sock.write(command.map { |cmd| "#{cmd}\r\n"}.join)
    end

    def bulk_command?(argv)
      BULK_COMMANDS[argv[0]] and argv.length > 1
    end

    def multi_bulk_command?(argv)
      MULTI_BULK_COMMANDS[argv[0]]
    end

    def process_reply(command)
      REPLY_PROCESSOR[command][read_reply]
    end

    def read_reply

      # We read the first byte using read() mainly because gets() is
      # immune to raw socket timeouts.
      begin
        reply_type = @sock.read(1)
      rescue Errno::EAGAIN

        # We want to make sure it reconnects on the next command after the
        # timeout. Otherwise the server may reply in the meantime leaving
        # the protocol in a desync status.
        @sock = nil
        raise Errno::EAGAIN, "Timeout reading from the socket"
      end

      raise Errno::ECONNRESET, "Connection lost" unless reply_type

      format_reply(reply_type, @sock.gets)
    end

    def format_reply(reply_type, line)
      case reply_type
      when "-" then format_error_reply(line)
      when "+" then format_status_reply(line)
      when ":" then format_integer_reply(line)
      when "$" then format_bulk_reply(line)
      when "*" then format_multi_bulk_reply(line)
      else raise ProtocolError.new(reply_type)
      end
    end

    def format_error_reply(line)
      raise "-" + line.strip
    end

    def format_status_reply(line)
      line.strip
    end

    def format_integer_reply(line)
      line.to_i
    end

    def format_bulk_reply(line)
      bulklen = line.to_i
      return if bulklen == -1
      reply = @sock.read(bulklen)
      @sock.read(2) # Discard CRLF.

      reply.respond_to?(:force_encoding) ?
        reply.force_encoding("UTF-8") :
        reply
    end

    def format_multi_bulk_reply(line)
      reply = []
      line.to_i.times { reply << read_reply }
      reply
    end

    def ssize(string)
      string.respond_to?(:bytesize) ? string.bytesize : string.size
    end
  end
end