require 'uri' module EventMachine::Hiredis # Emits the following events # # * :connected - on successful connection or reconnection # * :reconnected - on successful reconnection # * :disconnected - no longer connected, when previously in connected state # * :reconnect_failed(failure_number) - a reconnect attempt failed # This event is passed number of failures so far (1,2,3...) # * :monitor # class BaseClient include EventEmitter include EM::Deferrable attr_reader :host, :port, :password, :db def initialize(host='localhost', port='6379', password=nil, db=nil) @host, @port, @password, @db = host, port, password, db @defs = [] @command_queue = [] @closing_connection = false @reconnect_failed_count = 0 @reconnect_timer = nil @failed = false self.on(:failed) { @failed = true @command_queue.each do |df, _, _| df.fail(Error.new("Redis connection in failed state")) end @command_queue = [] } end # Configure the redis connection to use # # In usual operation, the uri should be passed to initialize. This method # is useful for example when failing over to a slave connection at runtime # def configure(uri_string) uri = URI(uri_string) @host = uri.host @port = uri.port @password = uri.password path = uri.path[1..-1] @db = path.empty? ? nil : path end def connect @connection = EM.connect(@host, @port, Connection, @host, @port) @connection.on(:closed) do if @connected @defs.each { |d| d.fail(Error.new("Redis disconnected")) } @defs = [] @deferred_status = nil @connected = false unless @closing_connection reconnect end emit(:disconnected) EM::Hiredis.logger.info("#{@connection.to_s} disconnected") else unless @closing_connection @reconnect_failed_count += 1 @reconnect_timer = EM.add_timer(EM::Hiredis.reconnect_timeout) { @reconnect_timer = nil reconnect } emit(:reconnect_failed, @reconnect_failed_count) EM::Hiredis.logger.info("#{@connection.to_s} reconnect failed") if @reconnect_failed_count >= 4 emit(:failed) self.fail(Error.new("Could not connect after 4 attempts")) end end end end @connection.on(:connected) do @connected = true @reconnect_failed_count = 0 @failed = false select(@db) if @db auth(@password) if @password @command_queue.each do |df, command, args| @connection.send_command(command, *args) @defs.push(df) end @command_queue = [] emit(:connected) EM::Hiredis.logger.info("#{@connection.to_s} connected") succeed if @reconnecting @reconnecting = false emit(:reconnected) end end @connection.on(:message) do |reply| if RuntimeError === reply raise "Replies out of sync: #{reply.inspect}" if @defs.empty? deferred = @defs.shift error = Error.new("Error reply from redis") error.redis_error = reply deferred.fail(error) if deferred else handle_reply(reply) end end @connected = false @reconnecting = false return self end # Indicates that commands have been sent to redis but a reply has not yet # been received # # This can be useful for example to avoid stopping the # eventmachine reactor while there are outstanding commands # def pending_commands? @connected && @defs.size > 0 end def connected? @connected end def select(db, &blk) @db = db method_missing(:select, db, &blk) end def auth(password, &blk) @password = password method_missing(:auth, password, &blk) end def close_connection EM.cancel_timer(@reconnect_timer) if @reconnect_timer @closing_connection = true @connection.close_connection_after_writing end def reconnect_connection EM.cancel_timer(@reconnect_timer) if @reconnect_timer reconnect end private def method_missing(sym, *args) deferred = EM::DefaultDeferrable.new # Shortcut for defining the callback case with just a block deferred.callback { |result| yield(result) } if block_given? if @connected @connection.send_command(sym, *args) @defs.push(deferred) elsif @failed deferred.fail(Error.new("Redis connection in failed state")) else @command_queue << [deferred, sym, args] end deferred end def reconnect @reconnecting = true @connection.reconnect @host, @port EM::Hiredis.logger.info("#{@connection.to_s} reconnecting") end def handle_reply(reply) if @defs.empty? if @monitoring emit(:monitor, reply) else raise "Replies out of sync: #{reply.inspect}" end else deferred = @defs.shift deferred.succeed(reply) if deferred end end end end