lib/redis.rb in ezmobius-redis-rb-0.0.3 vs lib/redis.rb in ezmobius-redis-rb-0.1

- old
+ new

@@ -1,472 +1,297 @@ require 'socket' -require 'set' -require File.join(File.dirname(__FILE__),'server') +require File.join(File.dirname(__FILE__),'pipeline') - -class RedisError < StandardError +begin + if RUBY_VERSION >= '1.9' + require 'timeout' + RedisTimer = Timeout + else + require 'system_timer' + RedisTimer = SystemTimer + end +rescue LoadError + RedisTimer = nil end -class RedisRenameError < StandardError -end + class Redis - ERR = "-".freeze - OK = 'OK'.freeze - SINGLE = '+'.freeze - BULK = '$'.freeze - MULTI = '*'.freeze - INT = ':'.freeze - - attr_reader :server - - - def initialize(opts={}) - @opts = {:host => 'localhost', :port => '6379', :db => 0}.merge(opts) - $debug = @opts[:debug] - @db = @opts[:db] - @server = Server.new(@opts[:host], @opts[:port]) - end - - def to_s - "#{host}:#{port}" - end - - def port - @opts[:port] - end - - def host - @opts[:host] - end - - def with_socket_management(server, &block) - begin - block.call(server.socket) - #Timeout or server down - rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNREFUSED => e - server.close - puts "Client (#{server.inspect}) disconnected from server: #{e.inspect}\n" if $debug - retry - #Server down - rescue NoMethodError => e - puts "Client (#{server.inspect}) tryin server that is down: #{e.inspect}\n Dying!" if $debug - raise Errno::ECONNREFUSED - #exit - end - end + OK = "OK".freeze + MINUS = "-".freeze + PLUS = "+".freeze + COLON = ":".freeze + DOLLAR = "$".freeze + ASTERISK = "*".freeze - def monitor - with_socket_management(@server) do |socket| - trap("INT") { puts "\nGot ^C! Dying!"; exit } - write "MONITOR\r\n" - puts "Now Monitoring..." - socket.read(12) - loop do - x = socket.gets - puts x unless x.nil? - end - end - end + BULK_COMMANDS = { + "set" => true, + "setnx" => true, + "rpush" => true, + "lpush" => true, + "lset" => true, + "lrem" => true, + "sadd" => true, + "srem" => true, + "sismember" => true, + "echo" => true, + "getset" => true, + "smove" => true + } - def quit - write "QUIT\r\n" - end - - def select_db(index) - @db = index - write "SELECT #{index}\r\n" - get_response - end - - def flush_db - write "FLUSHDB\r\n" - get_response == OK - end + BOOLEAN_PROCESSOR = lambda{|r| r == 0 ? false : r} - def flush_all - ensure_retry do - puts "Warning!\nFlushing *ALL* databases!\n5 Seconds to Hit ^C!" - trap('INT') {quit; return false} - sleep 5 - write "FLUSHALL\r\n" - get_response == OK - end - end + REPLY_PROCESSOR = { + "exists" => BOOLEAN_PROCESSOR, + "sismember" => BOOLEAN_PROCESSOR, + "sadd" => BOOLEAN_PROCESSOR, + "srem" => BOOLEAN_PROCESSOR, + "smove" => BOOLEAN_PROCESSOR, + "move" => BOOLEAN_PROCESSOR, + "setnx" => BOOLEAN_PROCESSOR, + "del" => BOOLEAN_PROCESSOR, + "renamenx" => BOOLEAN_PROCESSOR, + "expire" => BOOLEAN_PROCESSOR, + "keys" => lambda{|r| r.split(" ")}, + "info" => lambda{|r| + info = {} + r.each_line {|kv| + k,v = kv.split(":",2).map{|x| x.chomp} + info[k.to_sym] = v + } + info + } + } - def last_save - write "LASTSAVE\r\n" - get_response.to_i - end - - def bgsave - write "BGSAVE\r\n" - get_response == OK - end - - def info - info = {} - write("INFO\r\n") - x = get_response - x.each do |kv| - k,v = kv.split(':', 2) - k,v = k.chomp, v = v.chomp - info[k.to_sym] = v - end - info - end - - - def bulk_reply - begin - x = read.chomp - puts "bulk_reply read value is #{x.inspect}" if $debug - return x - rescue => e - puts "error in bulk_reply #{e}" if $debug - nil - end - end - - def write(data) - with_socket_management(@server) do |socket| - puts "writing: #{data}" if $debug - socket.write(data) - end - end - - def fetch(len) - with_socket_management(@server) do |socket| - len = [0, len.to_i].max - res = socket.read(len + 2) - res = res.chomp if res - res - end - end - - def read(length = read_proto) - with_socket_management(@server) do |socket| - res = socket.read(length) - puts "read is #{res.inspect}" if $debug - res - end - end + ALIASES = { + "flush_db" => "flushdb", + "flush_all" => "flushall", + "last_save" => "lastsave", + "key?" => "exists", + "delete" => "del", + "randkey" => "randomkey", + "list_length" => "llen", + "push_tail" => "rpush", + "push_head" => "lpush", + "pop_tail" => "rpop", + "pop_head" => "lpop", + "list_set" => "lset", + "list_range" => "lrange", + "list_trim" => "ltrim", + "list_index" => "lindex", + "list_rm" => "lrem", + "set_add" => "sadd", + "set_delete" => "srem", + "set_count" => "scard", + "set_member?" => "sismember", + "set_members" => "smembers", + "set_intersect" => "sinter", + "set_intersect_store" => "sinterstore", + "set_inter_store" => "sinterstore", + "set_union" => "sunion", + "set_union_store" => "sunionstore", + "set_diff" => "sdiff", + "set_diff_store" => "sdiffstore", + "set_move" => "smove", + "set_unless_exists" => "setnx", + "rename_unless_exists" => "renamenx", + "type?" => "type" + } - def keys(glob) - write "KEYS #{glob}\r\n" - get_response.split(' ') - end + DISABLED_COMMANDS = { + "monitor" => true, + "sync" => true + } - def rename!(oldkey, newkey) - write "RENAME #{oldkey} #{newkey}\r\n" - get_response - end - - def rename(oldkey, newkey) - write "RENAMENX #{oldkey} #{newkey}\r\n" - case get_response - when -1 - raise RedisRenameError, "source key: #{oldkey} does not exist" - when 0 - raise RedisRenameError, "target key: #{oldkey} already exists" - when -3 - raise RedisRenameError, "source and destination keys are the same" - when 1 - true - end - end - - def key?(key) - write "EXISTS #{key}\r\n" - get_response == 1 - end - - def delete(key) - write "DEL #{key}\r\n" - get_response == 1 - end - - def [](key) - get(key) + def initialize(options = {}) + @host = options[:host] || '127.0.0.1' + @port = (options[:port] || 6379).to_i + @db = (options[:db] || 0).to_i + @timeout = (options[:timeout] || 5).to_i + $debug = options[:debug] + connect_to_server end - def get(key) - write "GET #{key}\r\n" - get_response + def to_s + "Redis Client connected to #{@host}:#{@port} against DB #{@db}" end - - def mget(*keys) - write "MGET #{keys.join(' ')}\r\n" - get_response - end - def incr(key, increment=nil) - if increment - write "INCRBY #{key} #{increment}\r\n" - else - write "INCR #{key}\r\n" - end - get_response + def connect_to_server + @sock = connect_to(@host, @port, @timeout == 0 ? nil : @timeout) + call_command(["select",@db]) unless @db == 0 end - def decr(key, decrement=nil) - if decrement - write "DECRBY #{key} #{decrement}\r\n" + def connect_to(host, port, timeout=nil) + # We support connect() timeout only if system_timer is availabe + # or if we are running against Ruby >= 1.9 + # Timeout reading from the socket instead will be supported anyway. + if @timeout != 0 and RedisTimer + begin + sock = TCPSocket.new(host, port) + rescue Timeout::Error + @sock = nil + raise Timeout::Error, "Timeout connecting to the server" + end else - write "DECR #{key}\r\n" - end - get_response - end - - def randkey - write "RANDOMKEY\r\n" - get_response - end + sock = TCPSocket.new(host, port) + end + sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1 - def list_length(key) - write "LLEN #{key}\r\n" - case i = get_response - when -2 - raise RedisError, "key: #{key} does not hold a list value" - else - i + # If the timeout is set we set the low level socket options in order + # to make sure a blocking read will return after the specified number + # of seconds. This hack is from memcached ruby client. + if timeout + secs = Integer(timeout) + usecs = Integer((timeout - secs) * 1_000_000) + optval = [secs, usecs].pack("l_2") + sock.setsockopt Socket::SOL_SOCKET, Socket::SO_RCVTIMEO, optval + sock.setsockopt Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, optval end + sock end - def type?(key) - write "TYPE #{key}\r\n" - get_response + def method_missing(*argv) + call_command(argv) end - - def push_tail(key, string) - write "RPUSH #{key} #{string.to_s.size}\r\n#{string.to_s}\r\n" - get_response - end - def push_head(key, string) - write "LPUSH #{key} #{string.to_s.size}\r\n#{string.to_s}\r\n" - get_response + def call_command(argv) + puts argv.inspect if $debug + # this wrapper to raw_call_command handle reconnection on socket + # error. We try to reconnect just one time, otherwise let the error + # araise. + connect_to_server if !@sock + begin + raw_call_command(argv.dup) + rescue Errno::ECONNRESET, Errno::EPIPE + @sock.close + @sock = nil + connect_to_server + raw_call_command(argv.dup) + end end - - def pop_head(key) - write "LPOP #{key}\r\n" - get_response - end - def pop_tail(key) - write "RPOP #{key}\r\n" - get_response - end + def raw_call_command(argvp) + pipeline = argvp[0].is_a?(Array) - def list_set(key, index, val) - write "LSET #{key} #{index} #{val.to_s.size}\r\n#{val}\r\n" - get_response == OK - end - - def list_length(key) - write "LLEN #{key}\r\n" - case i = get_response - when -2 - raise RedisError, "key: #{key} does not hold a list value" + unless pipeline + argvv = [argvp] else - i + argvv = argvp end - end - def list_range(key, start, ending) - write "LRANGE #{key} #{start} #{ending}\r\n" - get_response - end + command = '' - def list_trim(key, start, ending) - write "LTRIM #{key} #{start} #{ending}\r\n" - get_response - end + argvv.each do |argv| + bulk = nil + argv[0] = argv[0].to_s.downcase + argv[0] = ALIASES[argv[0]] if ALIASES[argv[0]] + raise "#{argv[0]} command is disabled" if DISABLED_COMMANDS[argv[0]] + if BULK_COMMANDS[argv[0]] and argv.length > 1 + bulk = argv[-1].to_s + argv[-1] = bulk.length + end + command << argv.join(' ') + "\r\n" + command << bulk + "\r\n" if bulk + end - def list_index(key, index) - write "LINDEX #{key} #{index}\r\n" - get_response - end + @sock.write(command) - def list_rm(key, count, value) - write "LREM #{key} #{count} #{value.to_s.size}\r\n#{value}\r\n" - case num = get_response - when -1 - raise RedisError, "key: #{key} does not exist" - when -2 - raise RedisError, "key: #{key} does not hold a list value" - else - num + results = argvv.map do |argv| + processor = REPLY_PROCESSOR[argv[0]] + processor ? processor.call(read_reply) : read_reply end - end - def set_add(key, member) - write "SADD #{key} #{member.to_s.size}\r\n#{member}\r\n" - case get_response - when 1 - true - when 0 - false - when -2 - raise RedisError, "key: #{key} contains a non set value" - end + return pipeline ? results : results[0] end - def set_delete(key, member) - write "SREM #{key} #{member.to_s.size}\r\n#{member}\r\n" - case get_response - when 1 - true - when 0 - false - when -2 - raise RedisError, "key: #{key} contains a non set value" - end + def select(*args) + raise "SELECT not allowed, use the :db option when creating the object" end - def set_count(key) - write "SCARD #{key}\r\n" - case i = get_response - when -2 - raise RedisError, "key: #{key} contains a non set value" - else - i - end + def [](key) + get(key) end - def set_member?(key, member) - write "SISMEMBER #{key} #{member.to_s.size}\r\n#{member}\r\n" - case get_response - when 1 - true - when 0 - false - when -2 - raise RedisError, "key: #{key} contains a non set value" - end + def []=(key,value) + set(key,value) end - def set_members(key) - write "SMEMBERS #{key}\r\n" - Set.new(get_response) + def set(key, value, expiry=nil) + s = call_command([:set, key, value]) == OK + expire(key, expiry) if s && expiry + s end - def set_intersect(*keys) - write "SINTER #{keys.join(' ')}\r\n" - Set.new(get_response) + def sort(key, options = {}) + cmd = [] + cmd << "SORT #{key}" + cmd << "BY #{options[:by]}" if options[:by] + cmd << "GET #{[options[:get]].flatten * ' GET '}" if options[:get] + cmd << "#{options[:order]}" if options[:order] + cmd << "LIMIT #{options[:limit].join(' ')}" if options[:limit] + call_command(cmd) end - def set_inter_store(destkey, *keys) - write "SINTERSTORE #{destkey} #{keys.join(' ')}\r\n" - get_response + def incr(key, increment = nil) + call_command(increment ? ["incrby",key,increment] : ["incr",key]) end - def sort(key, opts={}) - cmd = "SORT #{key}" - cmd << " BY #{opts[:by]}" if opts[:by] - cmd << " GET #{opts[:get]}" if opts[:get] - cmd << " INCR #{opts[:incr]}" if opts[:incr] - cmd << " DEL #{opts[:del]}" if opts[:del] - cmd << " DECR #{opts[:decr]}" if opts[:decr] - cmd << " #{opts[:order]}" if opts[:order] - cmd << " LIMIT #{opts[:limit].join(' ')}" if opts[:limit] - cmd << "\r\n" - write(cmd) - get_response + def decr(key,decrement = nil) + call_command(decrement ? ["decrby",key,decrement] : ["decr",key]) end - - def multi_bulk - res = read_proto - puts "mb res is #{res.inspect}" if $debug - list = [] - Integer(res).times do - vf = get_response - puts "curren vf is #{vf.inspect}" if $debug - list << vf - puts "current list is #{list.inspect}" if $debug - end - list + + # Ruby defines a now deprecated type method so we need to override it here + # since it will never hit method_missing + def type(key) + call_command(['type', key]) end - - def get_reply - begin - r = read(1) - raise RedisError if (r == "\r" || r == "\n") - rescue RedisError - retry - end - r + + def quit + call_command(['quit']) + rescue Errno::ECONNRESET end - - def []=(key, val) - set(key,val) - end - - def set(key, val, expiry=nil) - write("SET #{key} #{val.to_s.size}\r\n#{val}\r\n") - get_response == OK + def pipelined(&block) + pipeline = Pipeline.new self + yield pipeline + pipeline.execute end - def set_unless_exists(key, val) - write "SETNX #{key} #{val.to_s.size}\r\n#{val}\r\n" - get_response == 1 - end - - def status_code_reply + def read_reply + # We read the first byte using read() mainly because gets() is + # immune to raw socket timeouts. begin - res = read_proto - if res.index('-') == 0 - raise RedisError, res - else - true - end - rescue RedisError - raise RedisError + rtype = @sock.read(1) + rescue Errno::EAGAIN + # We want to make sure it reconnects on the next command after the + # timeout. Otherwise the server may reply in the meantime leaving + # the protocol in a desync status. + @sock = nil + raise Errno::EAGAIN, "Timeout reading from the socket" end - end - - def get_response - begin - rtype = get_reply - rescue => e - raise RedisError, e.inspect - end - puts "reply_type is #{rtype.inspect}" if $debug + + raise Errno::ECONNRESET,"Connection lost" if !rtype + line = @sock.gets case rtype - when SINGLE - single_line - when BULK - bulk_reply - when MULTI - multi_bulk - when INT - integer_reply - when ERR - raise RedisError, single_line + when MINUS + raise MINUS + line.strip + when PLUS + line.strip + when COLON + line.to_i + when DOLLAR + bulklen = line.to_i + return nil if bulklen == -1 + data = @sock.read(bulklen) + @sock.read(2) # CRLF + data + when ASTERISK + objects = line.to_i + return nil if bulklen == -1 + res = [] + objects.times { + res << read_reply + } + res else - raise RedisError, "Unknown response.." + raise "Protocol error, got '#{rtype}' as initial reply byte" end end - - def integer_reply - Integer(read_proto) - end - - def single_line - buff = "" - while buff[-2..-1] != "\r\n" - buff << read(1) - end - puts "single_line value is #{buff[0..-3].inspect}" if $debug - buff[0..-3] - end - - def read_proto - with_socket_management(@server) do |socket| - if res = socket.gets - x = res.chomp - puts "read_proto is #{x.inspect}\n\n" if $debug - x.to_i - end - end - end - end