lib/couchpillow/document.rb in couchpillow-0.4.4 vs lib/couchpillow/document.rb in couchpillow-0.4.5

- old
+ new

@@ -8,11 +8,15 @@ RESERVED_KEYS = %i[_id _type _created_at _updated_at] DEFAULT_TYPE = "couchpillow".freeze + EVENTS = [ :cas_conflict ].freeze + CAS_CONFLICT_RETRY_COUNT = 5 + + attribute :_created_at do required type Time auto_convert default { Time.now.utc } @@ -24,20 +28,32 @@ auto_convert default { Time.now.utc } end - def initialize hash = {}, id = "#{self.class.doc_type}::#{SecureRandom.hex}" + # Constructor. + # @param hash The document + # @param id The id of the document + # @param cas CAS value of the document, from the CB client. Optional. + # @param flags Flags of the document, from the CB client. Optional. + # + def initialize hash = {}, + id = "#{self.class.doc_type}::#{SecureRandom.hex}", + cas = nil + @data = self.class.symbolize(hash) + @original = Marshal.load(Marshal.dump(@data)) @id = id time = Time.now.utc @data[:_created_at] ||= time @data[:_updated_at] = time @futures = [] + @cas = cas + rename! whitelist! assign_defaults! auto_convert! end @@ -54,72 +70,68 @@ # Save this document to the server # def save! opts = {} - whitelist! - sort! - timestamp! - validate! - to_save = @data.merge({ - :_type => self.class.doc_type - }) + result = nil - # write to all connections - result = self.class.default_db.set(@id, to_save, opts) - - unless self.class.secondary_dbs.empty? - @futures << Celluloid::Future.new do - self.class.secondary_dbs.each do |db| - db.set(@id, to_save, opts) - end - end + # write to the primary db first + result = cas_handler do + whitelist! + sort! + timestamp! + validate! + opts[:cas] = @cas + self.class.default_db.set(@id, to_save, opts) end + # write to the secondary only if the primary succeeds + # and ignore CAS for secondary DBs. + write_to_secondary_dbs do |db| + db.set @id, to_save + end if result + result end # Delete this document from the server. # def delete! result = self.class.default_db.delete @id - unless self.class.secondary_dbs.empty? - @futures << Celluloid::Future.new do - self.class.secondary_dbs.each do |db| - db.delete @id - end - end - end + # write to the secondary only if the primary succeeds + write_to_secondary_dbs do |db| + db.delete @id + end if result result end # Attempt to update this Document. Fails if this Document does not yet # exist in the database. # - def update! - whitelist! - sort! - timestamp! - validate! - to_save = @data.merge({ - :_type => self.class.doc_type - }) + def update! opts = {} - result = self.class.default_db.replace @id, to_save - - unless self.class.secondary_dbs.empty? - @futures << Celluloid::Future.new do - self.class.secondary_dbs.each do |db| - db.replace @id, to_save - end - end + # write to the primary db first + result = cas_handler do + whitelist! + sort! + timestamp! + validate! + opts[:cas] = @cas + result = self.class.default_db.replace(@id, to_save, opts) end + # write to the secondary only if the primary succeeds + # and ignore CAS for secondary DBs. + opts.delete :cas + write_to_secondary_dbs do |db| + db.replace @id, to_save + end if result + result end # Updates the attributes in the document. @@ -242,14 +254,16 @@ # Get a Document given an id. # # @return nil if not found or Document is of a different type. # def self.get id - result = default_db.get(id) and + result, _, cas = default_db.get(id, extended: true) + + result and type = result[:_type] || result["_type"] and type == doc_type and - new(result, id) or + new(result, id, cas) or nil end # Rename an existing key to a new key. This is invoked right after @@ -288,20 +302,81 @@ secondary_dbs << conn end end + # Registers a listener on a specific event. + # See {EVENTS} constant for a list of accepted events. + # + def self.on event, &block + event_listeners[event] = block if EVENTS.include?(event) + end + + private # Timestamp this document # def timestamp! @data[:_updated_at] = Time.now.utc end + def write_to_secondary_dbs &block + unless self.class.secondary_dbs.empty? + @futures << Celluloid::Future.new do + self.class.secondary_dbs.each do |db| + block.call(db) + end + end + end + end + + + def cas_handler &block + # write to the primary db first + rtcount = CAS_CONFLICT_RETRY_COUNT + begin + block.call + + rescue ValidationError + raise + + rescue Couchbase::Error::KeyExists + other_doc, _, newcas = self.class.default_db.get(@id, extended: true) + raise CASError, "There is a CAS conflict, but DB does not yield a document" unless other_doc + raise CASError, "There is a CAS conflict, but no :cas_conflict handler has been defined. See 'on' directive." unless self.class.event_listeners[:cas_conflict] + + # resolve conflict + other_doc = self.class.symbolize(other_doc) + @data = self.class.event_listeners[:cas_conflict].call(@original, other_doc, to_save) + @cas = newcas + + rtcount -= 1 + raise CASError, "Exhausted retries" if rtcount == 0 + retry + + end + end + + + # Get the final hash that will be saved to the database. + # Check if data has been modified. + # + def to_save + hash = @data.hash + return @tos if @tos && @toshash && @toshash == hash + + @tos = @data.merge({ + :_type => self.class.doc_type + }) + @toshash = @data.hash + @tos + end + + def self.doc_type @type ||= DEFAULT_TYPE end @@ -315,9 +390,14 @@ end def self.secondary_dbs @secondary_dbs ||= [] + end + + + def self.event_listeners + @event_listeners ||= {} end def self.symbolize hash hash.inject({}) do |memo,(k,v)|