lib/oxblood/commands/lists.rb in oxblood-0.1.0.dev9 vs lib/oxblood/commands/lists.rb in oxblood-0.1.0.dev10
- old
+ new
@@ -1,8 +1,51 @@
module Oxblood
module Commands
module Lists
+ # Remove and get the first element in a list, or block until one
+ # is available
+ # @see http://redis.io/commands/blpop
+ #
+ # @param [String, Array<String>] keys
+ # @param [Integer] timeout in seconds
+ #
+ # @return [nil] when no element could be popped and the timeout expired
+ # @return [[String, String]] a two-element multi-bulk with the first
+ # element being the name of the key where an element was popped and
+ # the second element being the value of the popped element
+ def blpop(*keys, timeout)
+ blocking_pop(:BLPOP, keys, timeout)
+ end
+
+ # Remove and get the last element in a list, or block until one
+ # is available
+ # @see http://redis.io/commands/brpop
+ #
+ # @param [String, Array<String>] keys
+ # @param [Integer] timeout in seconds
+ #
+ # @return [nil] when no element could be popped and the timeout expired
+ # @return [[String, String]] a two-element multi-bulk with the first
+ # element being the name of the key where an element was popped and
+ # the second element being the value of the popped element
+ def brpop(*keys, timeout)
+ blocking_pop(:BRPOP, keys, timeout)
+ end
+
+ # Pop a value from a list, push it to another list and return it;
+ # or block until one is available
+ # @see http://redis.io/commands/brpoplpush
+ #
+ # @param [String] source
+ # @param [String] destination
+ #
+ # @return [nil] when no element could be popped and the timeout expired
+ # @return [String] the element being popped and pushed
+ def brpoplpush(source, destination, timeout)
+ blocking_pop(:BRPOPLPUSH, [source, destination], timeout)
+ end
+
# Get an element from a list by its index
# @see http://www.redis.io/commands/lindex
#
# @param [String] key
# @param [Integer] index zero-based of element in the list
@@ -161,9 +204,35 @@
# @param [String] value
#
# @return [Integer] the length of the list after the push operation
def rpushx(key, value)
run(:RPUSHX, key, value)
+ end
+
+ private
+
+ # @note Mutates keys argument!
+ def blocking_pop(command, keys, timeout)
+ with_custom_timeout(timeout) do
+ run(*keys.unshift(command).push(timeout))
+ end
+ end
+
+ # Temporary increase socket timeout for blocking operations
+ # @note non-threadsafe!
+ def with_custom_timeout(timeout)
+ old_timeout = connection.socket.timeout
+
+ if timeout.zero?
+ # Indefinite blocking means 0 on redis server and nil on ruby
+ connection.socket.timeout = nil
+ else
+ connection.socket.timeout += timeout
+ end
+
+ yield
+ ensure
+ connection.socket.timeout = old_timeout
end
end
end
end