module CouchPillow class Document extend Attributive attr_reader :id RESERVED_KEYS = %i[_id _type _created_at _updated_at] DEFAULT_TYPE = "couchpillow".freeze EVENTS = [ :cas_conflict ].freeze CAS_CONFLICT_RETRY_COUNT = 5 attribute :_created_at do required type Time auto_convert default { Time.now.utc } end attribute :_updated_at do required type Time auto_convert default { Time.now.utc } end # Constructor. # @param hash The document # @param id The id of the document # @param cas CAS value of the document, from the CB client. Optional. # @param flags Flags of the document, from the CB client. Optional. # def initialize hash = {}, id = "#{self.class.doc_type}::#{SecureRandom.hex}", cas = nil @data = self.class.symbolize(hash) @original = Marshal.load(Marshal.dump(@data)) @id = id time = Time.now.utc @data[:_created_at] ||= time @data[:_updated_at] = time @futures = [] @cas = cas rename! whitelist! assign_defaults! auto_convert! end def [] key @data[key.to_s.to_sym] end def []= key, value @data[key.to_s.to_sym] = value end # Save this document to the server # def save! opts = {} result = nil # write to the primary db first result = cas_handler do whitelist! sort! timestamp! validate! opts[:cas] = @cas self.class.default_db.set(@id, to_save, opts) end # write to the secondary only if the primary succeeds # and ignore CAS for secondary DBs. write_to_secondary_dbs do |db| db.set @id, to_save end if result result end # Delete this document from the server. # def delete! result = self.class.default_db.delete @id # write to the secondary only if the primary succeeds write_to_secondary_dbs do |db| db.delete @id end if result result end # Attempt to update this Document. Fails if this Document does not yet # exist in the database. # def update! opts = {} # write to the primary db first result = cas_handler do whitelist! sort! timestamp! validate! opts[:cas] = @cas result = self.class.default_db.replace(@id, to_save, opts) end # write to the secondary only if the primary succeeds # and ignore CAS for secondary DBs. opts.delete :cas write_to_secondary_dbs do |db| db.replace @id, to_save end if result result end # Updates the attributes in the document. # Existing attributes will be overwritten and new ones will be added. # Any other existing attributes that are not present in the hash will be ignored. # def update hash hash.each do |k,v| @data[k.to_sym] = v end rename! whitelist! auto_convert! end # Check if this Document has the key # def has? key @data.has_key?(key) end # Convert this Document to a JSON string # def to_json *a to_hash.to_json(*a) end # Convert this Document to a Hash # def to_hash { :_id => @id, :_type => self.class.doc_type }.merge!(@data) end # Helper to get the type of this Document. # Can't really name this `type`. Need to avoid name conflict with Ruby's own `type` method. # def doc_type self.class.doc_type end # Blocks until all pending tasks has completed. # Returns the result of those tasks in an array. # def wait result = [] until @futures.empty? f = @futures.shift result << f.value end result end # Rename the keys in this Document as specified by the {rename} directive. # def rename! self.class.rename_keys.each do |from, to| @data.has_key?(from) and @data[to] = @data[from] and @data.delete(from) end end # Cleanup the @data hash so it only contains relevant fields. # def whitelist! @data.delete_if do |k, v| !self.class.attributes.has_key?(k) end end # Assign default values. # def assign_defaults! self.class.attributes.each do |k, attr| @data[k] = attr.trigger_default_directive if !has?(k) && attr.has_default? end end # Auto convert. # def auto_convert! self.class.attributes.each do |k, attr| @data[k] = attr.trigger_auto_convert_directive(@data[k]) if has?(k) end end # Go through each attribute, and validate the values. # Validation also perform auto-conversion if auto-conversion is enabled for that attribute. # def validate! self.class.attributes.each do |k, attr| if has?(k) @data[k] = attr.validate(@data[k]) else @data[k] = attr.trigger_default_directive if attr.has_default? raise ValidationError, "Attribute '#{k}' is required" if attr.required? && !has?(k) end end end # Sort keys on this document. # def sort! @data = @data.sort.to_h end # Get a Document given an id. # # @return nil if not found or Document is of a different type. # def self.get id result, _, cas = default_db.get(id, extended: true) result and type = result[:_type] || result["_type"] and type == doc_type and new(result, id, cas) or nil end # Rename an existing key to a new key. This is invoked right after # initialize. # def self.rename from, to raise ArgumentError, "Cannot rename reserved keys" if RESERVED_KEYS.include?(from) || RESERVED_KEYS.include?(to) rename_keys << [from.to_s.to_sym, to.to_s.to_sym] end # Sets the type of this Document. # def self.type value @type = value.to_s end # Set a DB connection. Overrides the default CouchPillow.db connection # for the first time this method gets called. Subsequent calls will set # secondary connections, which will only be used for write only. # # Example: # db primary_db # use for both read and write # db backup_db1 # write only # db backup_db2 # write only # def self.db conn # set the primary db connection @primary_db ||= conn # insert as backup db connections if conn && @primary_db && conn != @primary_db secondary_dbs << conn end end # Registers a listener on a specific event. # See {EVENTS} constant for a list of accepted events. # def self.on event, &block event_listeners[event] = block if EVENTS.include?(event) end private # Timestamp this document # def timestamp! @data[:_updated_at] = Time.now.utc end def write_to_secondary_dbs &block unless self.class.secondary_dbs.empty? @futures << Celluloid::Future.new do self.class.secondary_dbs.each do |db| block.call(db) end end end end def cas_handler &block # write to the primary db first rtcount = CAS_CONFLICT_RETRY_COUNT begin block.call rescue ValidationError raise rescue Couchbase::Error::KeyExists other_doc, _, newcas = self.class.default_db.get(@id, extended: true) raise CASError, "There is a CAS conflict, but DB does not yield a document" unless other_doc raise CASError, "There is a CAS conflict, but no :cas_conflict handler has been defined. See 'on' directive." unless self.class.event_listeners[:cas_conflict] # resolve conflict other_doc = self.class.symbolize(other_doc) @data = self.class.event_listeners[:cas_conflict].call(@original, other_doc, to_save) @cas = newcas rtcount -= 1 raise CASError, "Exhausted retries" if rtcount == 0 retry end end # Get the final hash that will be saved to the database. # Check if data has been modified. # def to_save hash = @data.hash return @tos if @tos && @toshash && @toshash == hash @tos = @data.merge({ :_type => self.class.doc_type }) @toshash = @data.hash @tos end def self.doc_type @type ||= DEFAULT_TYPE end def self.rename_keys @rename_keys ||= [] end def self.default_db @default_db ||= (@primary_db || CouchPillow.db) end def self.secondary_dbs @secondary_dbs ||= [] end def self.event_listeners @event_listeners ||= {} end def self.symbolize hash hash.inject({}) do |memo,(k,v)| memo[k.to_sym] = v memo end end end end