lib/riak/multiget.rb in riak-client-2.4.1 vs lib/riak/multiget.rb in riak-client-2.5.0
- old
+ new
@@ -1,122 +1,122 @@
-require 'riak/client'
-require 'riak/bucket'
-
-module Riak
- # Coordinates a parallel fetch operation for multiple values.
- class Multiget
- include Util::Translation
-
- # @return [Riak::Client] the associated client
- attr_reader :client
-
- # @return [Array<Bucket, String>] fetch_list an {Array} of {Bucket} and {String} keys to fetch
- attr_reader :fetch_list
-
- # @return [Hash<fetch_list_entry, RObject] result_hash a {Hash} of {Bucket} and {String} key pairs to {RObject} instances
- attr_accessor :result_hash
-
- # @return [Boolean] finished if the fetch operation has completed
- attr_reader :finished
-
- # @return [Integer] The number of threads to use
- attr_accessor :thread_count
-
- # Perform a Riak Multiget operation.
- # @param [Client] client the {Riak::Client} that will perform the multiget
- # @param [Array<Bucket, String>] fetch_list an {Array} of {Bucket} and {String} keys to fetch
- # @return [Hash<fetch_list_entry, RObject] result_hash a {Hash} of {Bucket} and {String} key pairs to {RObject} instances
- def self.get_all(client, fetch_list)
- multi = new client, fetch_list
- multi.fetch
- multi.results
- end
-
- # Create a Riak Multiget operation.
- # @param [Client] client the {Riak::Client} that will perform the multiget
- # @param [Array<Bucket, String>] fetch_list an {Array} of {Bucket} and {String} keys to fetch
- def initialize(client, fetch_list)
- raise ArgumentError, t('client_type', :client => client.inspect) unless client.is_a? Riak::Client
- raise ArgumentError, t('array_type', :array => fetch_list.inspect) unless fetch_list.is_a? Array
-
- validate_fetch_list fetch_list
- @client, @fetch_list = client, fetch_list.uniq
- self.result_hash = Hash.new
- @finished = false
- self.thread_count = client.multiget_threads
- end
-
- # Starts the parallelized fetch operation
- # @raise [ArgumentError] when a non-positive-Integer count is given
- def fetch
- queue = fetch_list.dup
- queue_mutex = Mutex.new
- result_mutex = Mutex.new
-
- unless thread_count.is_a?(Integer) && thread_count > 0
- raise ArgumentError, t("invalid_multiget_thread_count")
- end
-
- @threads = 1.upto(thread_count).map do |_node|
- Thread.new do
- loop do
- pair = queue_mutex.synchronize do
- queue.shift
- end
-
- break if pair.nil?
-
- found = attempt_fetch(*pair)
- result_mutex.synchronize do
- result_hash[pair] = found
- end
- end
- end
- end
- end
-
- def results
- wait_for_finish
- result_hash
- end
-
- def finished?
- set_finished_for_thread_liveness
- finished
- end
-
- def wait_for_finish
- return if finished?
- @threads.each {|t| t.join }
- @finished = true
- end
-
- private
-
- def attempt_fetch(bucket, key)
- bucket[key]
- rescue Riak::FailedRequest => e
- raise e unless e.not_found?
- nil
- end
-
- def set_finished_for_thread_liveness
- return if @finished # already done
-
- all_dead = @threads.none? {|t| t.alive? }
- return unless all_dead # still working
-
- @finished = true
- return
- end
-
- def validate_fetch_list(fetch_list)
- return unless erroneous = fetch_list.detect do |e|
- bucket, key = e
- next true unless bucket.is_a? Bucket
- next true unless key.is_a? String
- end
-
- raise ArgumentError, t('fetch_list_type', :problem => erroneous)
- end
- end
-end
+require 'riak/client'
+require 'riak/bucket'
+
+module Riak
+ # Coordinates a parallel fetch operation for multiple values.
+ class Multiget
+ include Util::Translation
+
+ # @return [Riak::Client] the associated client
+ attr_reader :client
+
+ # @return [Array<Bucket, String>] fetch_list an {Array} of {Bucket} and {String} keys to fetch
+ attr_reader :fetch_list
+
+ # @return [Hash<fetch_list_entry, RObject] result_hash a {Hash} of {Bucket} and {String} key pairs to {RObject} instances
+ attr_accessor :result_hash
+
+ # @return [Boolean] finished if the fetch operation has completed
+ attr_reader :finished
+
+ # @return [Integer] The number of threads to use
+ attr_accessor :thread_count
+
+ # Perform a Riak Multiget operation.
+ # @param [Client] client the {Riak::Client} that will perform the multiget
+ # @param [Array<Bucket, String>] fetch_list an {Array} of {Bucket} and {String} keys to fetch
+ # @return [Hash<fetch_list_entry, RObject] result_hash a {Hash} of {Bucket} and {String} key pairs to {RObject} instances
+ def self.get_all(client, fetch_list)
+ multi = new client, fetch_list
+ multi.fetch
+ multi.results
+ end
+
+ # Create a Riak Multiget operation.
+ # @param [Client] client the {Riak::Client} that will perform the multiget
+ # @param [Array<Bucket, String>] fetch_list an {Array} of {Bucket} and {String} keys to fetch
+ def initialize(client, fetch_list)
+ raise ArgumentError, t('client_type', :client => client.inspect) unless client.is_a? Riak::Client
+ raise ArgumentError, t('array_type', :array => fetch_list.inspect) unless fetch_list.is_a? Array
+
+ validate_fetch_list fetch_list
+ @client, @fetch_list = client, fetch_list.uniq
+ self.result_hash = Hash.new
+ @finished = false
+ self.thread_count = client.multiget_threads
+ end
+
+ # Starts the parallelized fetch operation
+ # @raise [ArgumentError] when a non-positive-Integer count is given
+ def fetch
+ queue = fetch_list.dup
+ queue_mutex = Mutex.new
+ result_mutex = Mutex.new
+
+ unless thread_count.is_a?(Integer) && thread_count > 0
+ raise ArgumentError, t("invalid_multiget_thread_count")
+ end
+
+ @threads = 1.upto(thread_count).map do |_node|
+ Thread.new do
+ loop do
+ pair = queue_mutex.synchronize do
+ queue.shift
+ end
+
+ break if pair.nil?
+
+ found = attempt_fetch(*pair)
+ result_mutex.synchronize do
+ result_hash[pair] = found
+ end
+ end
+ end
+ end
+ end
+
+ def results
+ wait_for_finish
+ result_hash
+ end
+
+ def finished?
+ set_finished_for_thread_liveness
+ finished
+ end
+
+ def wait_for_finish
+ return if finished?
+ @threads.each {|t| t.join }
+ @finished = true
+ end
+
+ private
+
+ def attempt_fetch(bucket, key)
+ bucket[key]
+ rescue Riak::FailedRequest => e
+ raise e unless e.not_found?
+ nil
+ end
+
+ def set_finished_for_thread_liveness
+ return if @finished # already done
+
+ all_dead = @threads.none? {|t| t.alive? }
+ return unless all_dead # still working
+
+ @finished = true
+ return
+ end
+
+ def validate_fetch_list(fetch_list)
+ return unless erroneous = fetch_list.detect do |e|
+ bucket, key = e
+ next true unless bucket.is_a? Bucket
+ next true unless key.is_a? String
+ end
+
+ raise ArgumentError, t('fetch_list_type', :problem => erroneous)
+ end
+ end
+end