# Copyright 2010 Sean Cribbs, Sonian Inc., and Basho Technologies, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. require 'riak' require 'set' module Riak # Parent class of all object types supported by ripple. {Riak::RObject} represents # the data and metadata stored in a bucket/key pair in the Riak database. class RObject include Util include Util::Translation include Util::Escape # @return [Bucket] the bucket in which this object is contained attr_accessor :bucket # @return [String] the key of this object within its bucket attr_accessor :key # @return [String] the MIME content type of the object attr_accessor :content_type # @return [String] the Riak vector clock for the object attr_accessor :vclock alias_attribute :vector_clock, :vclock # @return [Object] the data stored in Riak at this object's key. Varies in format by content-type, defaulting to String from the response body. attr_accessor :data # @return [Set] an Set of {Riak::Link} objects for relationships between this object and other resources attr_accessor :links # @return [String] the ETag header from the most recent HTTP response, useful for caching and reloading attr_accessor :etag # @return [Time] the Last-Modified header from the most recent HTTP response, useful for caching and reloading attr_accessor :last_modified # @return [Hash] a hash of any X-Riak-Meta-* headers that were in the HTTP response, keyed on the trailing portion attr_accessor :meta # Create a new object manually # @param [Bucket] bucket the bucket in which the object exists # @param [String] key the key at which the object resides. If nil, a key will be assigned when the object is saved. # @see Bucket#get def initialize(bucket, key=nil) @bucket, @key = bucket, key @links, @meta = Set.new, {} yield self if block_given? end # Load object data from an HTTP response # @param [Hash] response a response from {Riak::Client::HTTPBackend} def load(response) extract_header(response, "location", :key) {|v| URI.unescape(v.split("/").last) } extract_header(response, "content-type", :content_type) extract_header(response, "x-riak-vclock", :vclock) extract_header(response, "link", :links) {|v| Set.new(Link.parse(v)) } extract_header(response, "etag", :etag) extract_header(response, "last-modified", :last_modified) {|v| Time.httpdate(v) } @meta = response[:headers].inject({}) do |h,(k,v)| if k =~ /x-riak-meta-(.*)/ h[$1] = v end h end @conflict = response[:code].try(:to_i) == 300 && content_type =~ /multipart\/mixed/ @siblings = nil @data = deserialize(response[:body]) if response[:body].present? self end # HTTP header hash that will be sent along when storing the object # @return [Hash] hash of HTTP Headers def store_headers {}.tap do |hash| hash["Content-Type"] = @content_type hash["X-Riak-Vclock"] = @vclock if @vclock unless @links.blank? hash["Link"] = @links.reject {|l| l.rel == "up" }.map(&:to_s).join(", ") end unless @meta.blank? @meta.each do |k,v| hash["X-Riak-Meta-#{k}"] = v.to_s end end end end # HTTP header hash that will be sent along when reloading the object # @return [Hash] hash of HTTP headers def reload_headers {}.tap do |h| h['If-None-Match'] = @etag if @etag.present? h['If-Modified-Since'] = @last_modified.httpdate if @last_modified.present? end end # Store the object in Riak # @param [Hash] options query parameters # @option options [Fixnum] :r the "r" parameter (Read quorum for the implicit read performed when validating the store operation) # @option options [Fixnum] :w the "w" parameter (Write quorum) # @option options [Fixnum] :dw the "dw" parameter (Durable-write quorum) # @option options [Boolean] :returnbody (true) whether to return the result of a successful write in the body of the response. Set to false for fire-and-forget updates, set to true to immediately have access to the object's stored representation. # @return [Riak::RObject] self # @raise [ArgumentError] if the content_type is not defined def store(options={}) raise ArgumentError, t("content_type_undefined") unless @content_type.present? params = {:returnbody => true}.merge(options) method, codes, path = @key.present? ? [:put, [200,204,300], "#{escape(@bucket.name)}/#{escape(@key)}"] : [:post, 201, escape(@bucket.name)] response = @bucket.client.http.send(method, codes, @bucket.client.prefix, path, params, serialize(data), store_headers) load(response) end # Reload the object from Riak. Will use conditional GETs when possible. # @param [Hash] options query parameters # @option options [Fixnum] :r the "r" parameter (Read quorum) # @option options [Boolean] :force will force a reload request if the vclock is not present, useful for reloading the object after a store (not passed in the query params) # @return [Riak::RObject] self def reload(options={}) force = options.delete(:force) return self unless @key && (@vclock || force) codes = @bucket.allow_mult ? [200,300,304] : [200,304] response = @bucket.client.http.get(codes, @bucket.client.prefix, escape(@bucket.name), escape(@key), options, reload_headers) load(response) unless response[:code] == 304 self end alias :fetch :reload # Delete the object from Riak and freeze this instance. Will work whether or not the object actually # exists in the Riak database. def delete return if key.blank? @bucket.client.http.delete([204,404], @bucket.client.prefix, escape(@bucket.name), escape(@key)) freeze end # Returns sibling objects when in conflict. # @return [Array] an array of conflicting sibling objects for this key # @return [self] this object when not in conflict def siblings return self unless conflict? @siblings ||= Multipart.parse(data, Multipart.extract_boundary(content_type)).map do |part| RObject.new(self.bucket, self.key) do |sibling| sibling.load(part) sibling.vclock = vclock end end end # @return [true,false] Whether this object has conflicting sibling objects (divergent vclocks) def conflict? @conflict.present? end # Serializes the internal object data for sending to Riak. Differs based on the content-type. # This method is called internally when storing the object. # Automatically serialized formats: # * JSON (application/json) # * YAML (text/yaml) # * Marshal (application/octet-stream if meta['ruby-serialization'] == "Marshal") # @param [Object] payload the data to serialize def serialize(payload) return payload if IO === payload case @content_type when /json/ ActiveSupport::JSON.encode(payload) when /yaml/ YAML.dump(payload) when "application/octet-stream" if @meta['ruby-serialization'] == "Marshal" Marshal.dump(payload) else payload.to_s end else payload.to_s end end # Deserializes the internal object data from a Riak response. Differs based on the content-type. # This method is called internally when loading the object. # Automatically deserialized formats: # * JSON (application/json) # * YAML (text/yaml) # * Marshal (application/octet-stream if meta['ruby-serialization'] == "Marshal") # @param [String] body the serialized response body def deserialize(body) case @content_type when /json/ ActiveSupport::JSON.decode(body) when /yaml/ YAML.load(body) when "application/octet-stream" if @meta['ruby-serialization'] == "Marshal" Marshal.load(body) else body end else body end end # @return [String] A representation suitable for IRB and debugging output def inspect "#<#{self.class.name} #{@bucket.client.http.path(@bucket.client.prefix, escape(@bucket.name), escape(@key)).to_s} [#{@content_type}]:#{@data.inspect}>" end # Walks links from this object to other objects in Riak. def walk(*params) specs = WalkSpec.normalize(*params) response = @bucket.client.http.get(200, @bucket.client.prefix, escape(@bucket.name), escape(@key), specs.join("/")) if boundary = Multipart.extract_boundary(response[:headers]['content-type'].first) Multipart.parse(response[:body], boundary).map do |group| map_walk_group(group) end else [] end end # Converts the object to a link suitable for linking other objects to it def to_link(tag=nil) Link.new(@bucket.client.http.path(@bucket.client.prefix, escape(@bucket.name), escape(@key)).path, tag) end private def extract_header(response, name, attribute=nil) if response[:headers][name].present? value = response[:headers][name].try(:first) value = yield value if block_given? send("#{attribute}=", value) if attribute end end def map_walk_group(group) group.map do |obj| if obj[:headers] && obj[:body] && obj[:headers]['location'] bucket, key = $1, $2 if obj[:headers]['location'].first =~ %r{/.*/(.*)/(.*)$} RObject.new(@bucket.client.bucket(bucket, :keys => false), key).load(obj) end end end end end