lib/redis_cluster/pool.rb in redis_cluster-0.3.1 vs lib/redis_cluster/pool.rb in redis_cluster-0.3.2
- old
+ new
@@ -52,10 +52,36 @@
def pipelined(args, &block)
random_node.execute :pipelined, args, &block
end
+ # Implements scan across all nodes in the pool.
+ # Cursors will behave strangely if the node list changes during iteration.
+ def scan(args)
+ scan_cursor = args.first
+ options = args[1] || {}
+ node_cursor, node_index = decode_scan_cursor(scan_cursor)
+ next_node_cursor, result = @nodes[node_index].execute("scan", [node_cursor, options])
+ [encode_next_scan_cursor(next_node_cursor, node_index), result]
+ end
+
private
+
+ def encode_next_scan_cursor(next_node_cursor, node_index)
+ if next_node_cursor == '0'
+ # '0' indicates the end of iteration. Advance the node index and
+ # start at the '0' position on the next node. If this was the last node,
+ # loop around and return '0' to indicate iteration is done.
+ ((node_index + 1) % @nodes.size)
+ else
+ ((next_node_cursor.to_i * @nodes.size) + node_index)
+ end.to_s # Cursors are strings
+ end
+
+ def decode_scan_cursor(cursor)
+ node_cursor, node_index = cursor.to_i.divmod(@nodes.size)
+ [node_cursor.to_s, node_index] # Cursors are strings.
+ end
def node_by(key)
slot = Slot.slot_by(key)
@nodes.find { |node| node.has_slot?(slot) }
end