lib/riak/client.rb in riakpb-0.1.6 vs lib/riak/client.rb in riakpb-0.2.0
- old
+ new
@@ -1,9 +1,9 @@
require 'riak'
-module Riak
- # A client connection to Riak.
+module Riakpb
+ # A client connection to Riakpb.
class Client
include Util::Translation
include Util::MessageCode
autoload :Rpc, 'riak/client/rpc'
@@ -20,45 +20,53 @@
attr_reader :buckets
attr_reader :bucket_cache
attr_reader :node
attr_reader :server_version
attr_reader :client_id
-
- # Creates a client connection to Riak's Protobuf Listener
- # @param [String] options configuration options for the client
- # @param [String] host ('127.0.0.1') The host or IP address for the Riak endpoint
- # @param [Fixnum] port (8087) The port of the Riak protobuf listener endpoint
+ attr_reader :options
+
+ # Creates a client connection to Riakpb's Protobuf Listener
+ # @options [Hash] options configuration options for the client
def initialize(options={})
self.host = options[:host] || "127.0.0.1"
self.port = options[:port] || 8087
- self.client_id = options[:client_id] unless options[:client_id].nil?
- @w = options[:w]
- @dw = options[:dw]
+ self.client_id = options[:client_id] unless options[:client_id].blank?
+
+ read_quorum = options[:r] || options[:read_quorum]
+ write_quorum = options[:w] || options[:write_quorum]
+ replica_commit = options[:dw] || options[:replica_commit]
+ return_body = options[:rb] || options[:return_body] || true
+
+ @options = options.slice!(:host, :port, :client_id, :r, :read_quorum, :w, :write_quorum, :dw, :replica_commit, :rb, :return_body)
+ @options[:r] = read_quorum unless read_quorum.blank?
+ @options[:w] = write_quorum unless write_quorum.blank?
+ @options[:dw] = replica_commit unless replica_commit.blank?
+ @options[:return_body] = return_body unless return_body.blank?
+
@buckets = []
- @bucket_cache = Hash.new{|k,v| k[v] = Riak::Bucket.new(self, v)}
+ @bucket_cache = Hash.new{|k,v| k[v] = Riakpb::Bucket.new(self, v, @options)}
end
-
- # Set the hostname of the Riak endpoint. Must be an IPv4, IPv6, or valid hostname
- # @param [String] value The host or IP address for the Riak endpoint
+ # Set the hostname of the Riakpb endpoint. Must be an IPv4, IPv6, or valid hostname
+ # @param [String] value The host or IP address for the Riakpb endpoint
# @raise [ArgumentError] if an invalid hostname is given
# @return [String] the assigned hostname
def host=(value)
raise ArgumentError, t("hostname_invalid") unless value.is_a?(String) && value =~ HOST_REGEX
@host = value
end
- # Set the port number of the Riak endpoint. This must be an integer between 0 and 65535.
- # @param [Fixnum] value The port number of the Riak endpoint
+ # Set the port number of the Riakpb endpoint. This must be an integer between 0 and 65535.
+ # @param [Fixnum] value The port number of the Riakpb endpoint
# @raise [ArgumentError] if an invalid port number is given
# @return [Fixnum] the assigned port number
def port=(value)
raise ArgumentError, t("port_invalid") unless (0..65535).include?(value)
@port = value
end
# Set the client ID for this client. Must be a string or Fixnum value 0 =< value < MAX_CLIENT_ID.
- # @param [String, Fixnum] value The internal client ID used by Riak to route responses
+ # @param [String, Fixnum] value The internal client ID used by Riakpb to route responses
# @raise [ArgumentError] when an invalid client ID is given
# @return [String] the assigned client ID
def client_id=(value)
@client_id = case value
when 0...MAX_CLIENT_ID
@@ -69,50 +77,50 @@
raise ArgumentError, t("invalid_client_id", :max_id => MAX_CLIENT_ID)
end
end
# Establish a connection to the riak node, and store the Rpc instance
- # @return [Riak::Client::Rpc] the Rpc instance that handles connections to the riak node
+ # @return [Riakpb::Client::Rpc] the Rpc instance that handles connections to the riak node
def rpc(options={})
options[:client_id] ||= @client_id if @client_id
@rpc ||= Rpc.new(self)
end
- # Tests connectivity with the Riak host.
+ # Tests connectivity with the Riakpb host.
# @return [Boolean] Successful returned as 'true', failed connection returned as 'false'
def ping?
rpc.request Util::MessageCode::PING_REQUEST
return rpc.status
end
# Retrieves basic information from the riak node.
# @return [Hash] Returns the name of the node and its software release number
def info
- response = rpc.request Riak::Util::MessageCode::GET_SERVER_INFO_REQUEST
+ response = rpc.request Riakpb::Util::MessageCode::GET_SERVER_INFO_REQUEST
@node = response.node
@server_version = response.server_version
{:node => @node, :server_version => @server_version}
end
- # I need bucket! Bring me bucket! (Retrieves a bucket from Riak. Eating disorder not included.)
+ # I need bucket! Bring me bucket! (Retrieves a bucket from Riakpb. Eating disorder not included.)
# @param [String] bucket the bucket to retrieve
# @return [Bucket] the requested bucket
def bucket(bucket)
return(@bucket_cache[bucket]) if @bucket_cache.has_key?(bucket)
- bring_me_bucket!(bucket)
+ self.bucket!(bucket)
end
alias :[] :bucket
alias :bring_me_bucket :bucket
- # I need bucket! Bring me bucket! (Retrieves a bucket from Riak, even if it's already been retrieved.)
+ # I need bucket! Bring me bucket! (Retrieves a bucket from Riakpb, even if it's already been retrieved.)
# @param [String] bucket the bucket to retrieve
# @return [Bucket] the requested bucket
def bucket!(bucket)
- request = Riak::RpbGetBucketReq.new(:bucket => bucket)
+ request = Riakpb::RpbGetBucketReq.new(:bucket => bucket)
response = rpc.request(
Util::MessageCode::GET_BUCKET_REQUEST,
request
)
@bucket_cache[bucket].load(response)
@@ -122,16 +130,16 @@
# Set the properties for a given bucket, and then reload it.
# @param [String] bucket the bucket name in which props will be set
# @param [RpbBucketProps, Hash] props the properties to be set within the given bucket
# @return [TrueClass, FalseClass] whether or not the operation was successful
def set_bucket(bucket, props)
- props = Riak::RpbBucketProps.new(props) if props.is_a?(Hash)
+ props = Riakpb::RpbBucketProps.new(props) if props.is_a?(Hash)
- raise TypeError.new t('invalid_props') unless props.is_a?(Riak::RpbBucketProps)
+ raise TypeError.new t('invalid_props') unless props.is_a?(Riakpb::RpbBucketProps)
begin
- request = Riak::RpbSetBucketReq.new(:bucket => bucket, :props => props)
+ request = Riakpb::RpbSetBucketReq.new(:bucket => bucket, :props => props)
response = rpc.request(
Util::MessageCode::SET_BUCKET_REQUEST,
request
)
self.bucket!(bucket)
@@ -140,19 +148,25 @@
rescue FailedRequest
return(false)
end
end
- # Retrieves a key, using RpbGetReq, from within a given bucket, from Riak.
+ # Retrieves a key, using RpbGetReq, from within a given bucket, from Riakpb.
# @param [String] bucket the bucket from which to retrieve the key
# @param [String] key the name of the key to be received
# @param [Fixnum] quorum read quorum- num of replicas need to agree when retrieving the object
# @return [RpbGetResp] the response in which the given Key is stored
def get_request(bucket, key, quorum=nil)
- request = Riak::RpbGetReq.new({:bucket => bucket, :key => key})
- request.r = quorum if quorum.is_a?(Fixnum)
+ request = Riakpb::RpbGetReq.new({:bucket => bucket, :key => key})
+ quorum ||= @read_quorum
+ unless quorum.blank?
+ quorum = quorum.to_i
+ request.r = quorum
+ end
+
+
response = rpc.request(
Util::MessageCode::GET_REQUEST,
request
)
@@ -167,71 +181,74 @@
# @option options [Boolean] :return_body whether to return the contents of the stored object.
# @return [RpbPutResp] the response confirming Key storage and (optionally) the Key's updated/new data.
def put_request(options)
raise ArgumentError, t('invalid_bucket') if options[:bucket].empty?
raise ArgumentError, t('empty_content') if options[:content].nil?
+ options[:w] ||= @write_quorum unless @write_quorum.nil?
+ options[:dw] ||= @replica_commit unless @replica_commit.nil?
+ options[:return_body] = @return_body unless options.has_key?(:return_body)
- options[:w] ||= @w unless @w.nil?
- options[:dw] ||= @dw unless @dw.nil?
- options[:return_body] ||= true
-
- request = Riak::RpbPutReq.new(options)
+ request = Riakpb::RpbPutReq.new(options.slice :bucket, :key, :vclock, :content, :w, :dw, :return_body)
response = rpc.request(
Util::MessageCode::PUT_REQUEST,
request
)
+ return(true) if response == ""
return(response)
end
- # Deletes a key, using RpbDelReq, from within a given bucket, from Riak.
+ # Deletes a key, using RpbDelReq, from within a given bucket, from Riakpb.
# @param [String] bucket the bucket from which to delete the key
# @param [String] key the name of the key to be deleted
# @param [Fixnum] rw how many replicas to delete before returning a successful response
# @return [RpbGetResp] the response confirming deletion
def del_request(bucket, key, rw=nil)
- request = Riak::RpbDelReq.new
+ request = Riakpb::RpbDelReq.new
request.bucket = bucket
request.key = key
request.rw ||= rw
response = rpc.request(
Util::MessageCode::DEL_REQUEST,
request
)
+
+ return(true) if response == ""
+ return(response)
end
# Sends a MapReduce operation to riak, using RpbMapRedReq, and returns the Response/phases.
# @param [String] mr_request map/reduce job, encoded/stringified
# @param [String] content_type encoding for map/reduce job
# @return [RpbMapRedResp] the response, encoded in the same format that was sent
def map_reduce_request(mr_request, content_type)
- request = Riak::RpbMapRedReq.new
+ request = Riakpb::RpbMapRedReq.new
request.request = mr_request
request.content_type = content_type
response = rpc.request(
Util::MessageCode::MAP_REDUCE_REQUEST,
request
)
- return(response)
+ return(response)
end
alias :mapred :map_reduce_request
alias :mr :map_reduce_request
- # Lists the buckets found in the Riak database
+ # Lists the buckets found in the Riakpb database
# @raise [ReturnRespError] if the message response does not correlate with the message requested
# @return [Array] list of buckets (String)
def buckets
response = rpc.request Util::MessageCode::LIST_BUCKETS_REQUEST
# iterate through each of the Strings in the Bucket list, returning an array of String(s)
@buckets = response.buckets.each{|b| b}
end
- # Lists the keys within their respective buckets, that are found in the Riak database
+ # Lists the keys within their respective buckets, that are found in the Riakpb database
# @param [String] bucket the bucket from which to retrieve the list of keys
# @raise [ReturnRespError] if the message response does not correlate with the message requested
# @return [Hash] Mapping of the buckets (String) to their keys (Array of Strings)
def keys_in(bucket)
list_keys_request = RpbListKeysReq.new(:bucket => bucket)
@@ -244,44 +261,13 @@
# @return [String] A representation suitable for IRB and debugging output.
# def inspect
# "#<Client >"
# end
- # Junkshot lets you throw a lot of data at riak, which will then need to be reconciled later
- # @overload junkshot(bucket, key, params)
- # @param [String] bucket the name of the bucket
- # @param [String] key the name of the key
- # @param [Hash] params the parameters that are to be updated (Needs fixin')
- # @overload junkshot(key, params)
- # @param [Key] key the Key instance to be junkshotted
- # @param [Hash] params the parameters that are to be updated (Needs fixin')
- # @return [String, Key] dependent upon whether :return_body is set to true or false
- def junkshot(*params)
- params = params.dup.flatten
- case params.size
- when 3
- bucket = params[0]
- key = params[1]
- params = params[2]
- when 2
- begin
- key = params[0]
- bucket = key.bucket
- key = key.name
- rescue NoMethodError
- raise TypeError.new t('invalid_key')
- end
- params = params[1]
- end
- self.bucket!(bucket).junkshot(key, params)
- end
- alias :stuff :junkshot
- alias :jk :junkshot
-
private
def b64encode(n)
Base64.encode64([n].pack("N")).chomp
end
end # class Client
-end # module Riak
+end # module Riakpb