lib/oxblood/commands/lists.rb in oxblood-0.1.0.dev9 vs lib/oxblood/commands/lists.rb in oxblood-0.1.0.dev10

- old
+ new

@@ -1,8 +1,51 @@ module Oxblood module Commands module Lists + # Remove and get the first element in a list, or block until one + # is available + # @see http://redis.io/commands/blpop + # + # @param [String, Array<String>] keys + # @param [Integer] timeout in seconds + # + # @return [nil] when no element could be popped and the timeout expired + # @return [[String, String]] a two-element multi-bulk with the first + # element being the name of the key where an element was popped and + # the second element being the value of the popped element + def blpop(*keys, timeout) + blocking_pop(:BLPOP, keys, timeout) + end + + # Remove and get the last element in a list, or block until one + # is available + # @see http://redis.io/commands/brpop + # + # @param [String, Array<String>] keys + # @param [Integer] timeout in seconds + # + # @return [nil] when no element could be popped and the timeout expired + # @return [[String, String]] a two-element multi-bulk with the first + # element being the name of the key where an element was popped and + # the second element being the value of the popped element + def brpop(*keys, timeout) + blocking_pop(:BRPOP, keys, timeout) + end + + # Pop a value from a list, push it to another list and return it; + # or block until one is available + # @see http://redis.io/commands/brpoplpush + # + # @param [String] source + # @param [String] destination + # + # @return [nil] when no element could be popped and the timeout expired + # @return [String] the element being popped and pushed + def brpoplpush(source, destination, timeout) + blocking_pop(:BRPOPLPUSH, [source, destination], timeout) + end + # Get an element from a list by its index # @see http://www.redis.io/commands/lindex # # @param [String] key # @param [Integer] index zero-based of element in the list @@ -161,9 +204,35 @@ # @param [String] value # # @return [Integer] the length of the list after the push operation def rpushx(key, value) run(:RPUSHX, key, value) + end + + private + + # @note Mutates keys argument! + def blocking_pop(command, keys, timeout) + with_custom_timeout(timeout) do + run(*keys.unshift(command).push(timeout)) + end + end + + # Temporary increase socket timeout for blocking operations + # @note non-threadsafe! + def with_custom_timeout(timeout) + old_timeout = connection.socket.timeout + + if timeout.zero? + # Indefinite blocking means 0 on redis server and nil on ruby + connection.socket.timeout = nil + else + connection.socket.timeout += timeout + end + + yield + ensure + connection.socket.timeout = old_timeout end end end end