lib/riak/multiget.rb in riak-client-2.5.0 vs lib/riak/multiget.rb in riak-client-2.6.0
- old
+ new
@@ -1,122 +1,22 @@
require 'riak/client'
require 'riak/bucket'
+require 'riak/multi'
module Riak
- # Coordinates a parallel fetch operation for multiple values.
- class Multiget
- include Util::Translation
-
- # @return [Riak::Client] the associated client
- attr_reader :client
-
- # @return [Array<Bucket, String>] fetch_list an {Array} of {Bucket} and {String} keys to fetch
- attr_reader :fetch_list
-
- # @return [Hash<fetch_list_entry, RObject] result_hash a {Hash} of {Bucket} and {String} key pairs to {RObject} instances
- attr_accessor :result_hash
-
- # @return [Boolean] finished if the fetch operation has completed
- attr_reader :finished
-
- # @return [Integer] The number of threads to use
- attr_accessor :thread_count
-
- # Perform a Riak Multiget operation.
- # @param [Client] client the {Riak::Client} that will perform the multiget
- # @param [Array<Bucket, String>] fetch_list an {Array} of {Bucket} and {String} keys to fetch
- # @return [Hash<fetch_list_entry, RObject] result_hash a {Hash} of {Bucket} and {String} key pairs to {RObject} instances
- def self.get_all(client, fetch_list)
- multi = new client, fetch_list
- multi.fetch
- multi.results
+ # Coordinates a parallel fetch operation for multiple keys.
+ class Multiget < Multi
+ # @deprecated use perform
+ class << self
+ alias_method :get_all, :perform
end
- # Create a Riak Multiget operation.
- # @param [Client] client the {Riak::Client} that will perform the multiget
- # @param [Array<Bucket, String>] fetch_list an {Array} of {Bucket} and {String} keys to fetch
- def initialize(client, fetch_list)
- raise ArgumentError, t('client_type', :client => client.inspect) unless client.is_a? Riak::Client
- raise ArgumentError, t('array_type', :array => fetch_list.inspect) unless fetch_list.is_a? Array
-
- validate_fetch_list fetch_list
- @client, @fetch_list = client, fetch_list.uniq
- self.result_hash = Hash.new
- @finished = false
- self.thread_count = client.multiget_threads
- end
-
- # Starts the parallelized fetch operation
- # @raise [ArgumentError] when a non-positive-Integer count is given
- def fetch
- queue = fetch_list.dup
- queue_mutex = Mutex.new
- result_mutex = Mutex.new
-
- unless thread_count.is_a?(Integer) && thread_count > 0
- raise ArgumentError, t("invalid_multiget_thread_count")
- end
-
- @threads = 1.upto(thread_count).map do |_node|
- Thread.new do
- loop do
- pair = queue_mutex.synchronize do
- queue.shift
- end
-
- break if pair.nil?
-
- found = attempt_fetch(*pair)
- result_mutex.synchronize do
- result_hash[pair] = found
- end
- end
- end
- end
- end
-
- def results
- wait_for_finish
- result_hash
- end
-
- def finished?
- set_finished_for_thread_liveness
- finished
- end
-
- def wait_for_finish
- return if finished?
- @threads.each {|t| t.join }
- @finished = true
- end
-
private
- def attempt_fetch(bucket, key)
+ def work(bucket, key)
bucket[key]
rescue Riak::FailedRequest => e
raise e unless e.not_found?
nil
- end
-
- def set_finished_for_thread_liveness
- return if @finished # already done
-
- all_dead = @threads.none? {|t| t.alive? }
- return unless all_dead # still working
-
- @finished = true
- return
- end
-
- def validate_fetch_list(fetch_list)
- return unless erroneous = fetch_list.detect do |e|
- bucket, key = e
- next true unless bucket.is_a? Bucket
- next true unless key.is_a? String
- end
-
- raise ArgumentError, t('fetch_list_type', :problem => erroneous)
end
end
end