lib/couchpillow/document.rb in couchpillow-0.4.4 vs lib/couchpillow/document.rb in couchpillow-0.4.5
- old
+ new
@@ -8,11 +8,15 @@
RESERVED_KEYS = %i[_id _type _created_at _updated_at]
DEFAULT_TYPE = "couchpillow".freeze
+ EVENTS = [ :cas_conflict ].freeze
+ CAS_CONFLICT_RETRY_COUNT = 5
+
+
attribute :_created_at do
required
type Time
auto_convert
default { Time.now.utc }
@@ -24,20 +28,32 @@
auto_convert
default { Time.now.utc }
end
- def initialize hash = {}, id = "#{self.class.doc_type}::#{SecureRandom.hex}"
+ # Constructor.
+ # @param hash The document
+ # @param id The id of the document
+ # @param cas CAS value of the document, from the CB client. Optional.
+ # @param flags Flags of the document, from the CB client. Optional.
+ #
+ def initialize hash = {},
+ id = "#{self.class.doc_type}::#{SecureRandom.hex}",
+ cas = nil
+
@data = self.class.symbolize(hash)
+ @original = Marshal.load(Marshal.dump(@data))
@id = id
time = Time.now.utc
@data[:_created_at] ||= time
@data[:_updated_at] = time
@futures = []
+ @cas = cas
+
rename!
whitelist!
assign_defaults!
auto_convert!
end
@@ -54,72 +70,68 @@
# Save this document to the server
#
def save! opts = {}
- whitelist!
- sort!
- timestamp!
- validate!
- to_save = @data.merge({
- :_type => self.class.doc_type
- })
+ result = nil
- # write to all connections
- result = self.class.default_db.set(@id, to_save, opts)
-
- unless self.class.secondary_dbs.empty?
- @futures << Celluloid::Future.new do
- self.class.secondary_dbs.each do |db|
- db.set(@id, to_save, opts)
- end
- end
+ # write to the primary db first
+ result = cas_handler do
+ whitelist!
+ sort!
+ timestamp!
+ validate!
+ opts[:cas] = @cas
+ self.class.default_db.set(@id, to_save, opts)
end
+ # write to the secondary only if the primary succeeds
+ # and ignore CAS for secondary DBs.
+ write_to_secondary_dbs do |db|
+ db.set @id, to_save
+ end if result
+
result
end
# Delete this document from the server.
#
def delete!
result = self.class.default_db.delete @id
- unless self.class.secondary_dbs.empty?
- @futures << Celluloid::Future.new do
- self.class.secondary_dbs.each do |db|
- db.delete @id
- end
- end
- end
+ # write to the secondary only if the primary succeeds
+ write_to_secondary_dbs do |db|
+ db.delete @id
+ end if result
result
end
# Attempt to update this Document. Fails if this Document does not yet
# exist in the database.
#
- def update!
- whitelist!
- sort!
- timestamp!
- validate!
- to_save = @data.merge({
- :_type => self.class.doc_type
- })
+ def update! opts = {}
- result = self.class.default_db.replace @id, to_save
-
- unless self.class.secondary_dbs.empty?
- @futures << Celluloid::Future.new do
- self.class.secondary_dbs.each do |db|
- db.replace @id, to_save
- end
- end
+ # write to the primary db first
+ result = cas_handler do
+ whitelist!
+ sort!
+ timestamp!
+ validate!
+ opts[:cas] = @cas
+ result = self.class.default_db.replace(@id, to_save, opts)
end
+ # write to the secondary only if the primary succeeds
+ # and ignore CAS for secondary DBs.
+ opts.delete :cas
+ write_to_secondary_dbs do |db|
+ db.replace @id, to_save
+ end if result
+
result
end
# Updates the attributes in the document.
@@ -242,14 +254,16 @@
# Get a Document given an id.
#
# @return nil if not found or Document is of a different type.
#
def self.get id
- result = default_db.get(id) and
+ result, _, cas = default_db.get(id, extended: true)
+
+ result and
type = result[:_type] || result["_type"] and
type == doc_type and
- new(result, id) or
+ new(result, id, cas) or
nil
end
# Rename an existing key to a new key. This is invoked right after
@@ -288,20 +302,81 @@
secondary_dbs << conn
end
end
+ # Registers a listener on a specific event.
+ # See {EVENTS} constant for a list of accepted events.
+ #
+ def self.on event, &block
+ event_listeners[event] = block if EVENTS.include?(event)
+ end
+
+
private
# Timestamp this document
#
def timestamp!
@data[:_updated_at] = Time.now.utc
end
+ def write_to_secondary_dbs &block
+ unless self.class.secondary_dbs.empty?
+ @futures << Celluloid::Future.new do
+ self.class.secondary_dbs.each do |db|
+ block.call(db)
+ end
+ end
+ end
+ end
+
+
+ def cas_handler &block
+ # write to the primary db first
+ rtcount = CAS_CONFLICT_RETRY_COUNT
+ begin
+ block.call
+
+ rescue ValidationError
+ raise
+
+ rescue Couchbase::Error::KeyExists
+ other_doc, _, newcas = self.class.default_db.get(@id, extended: true)
+ raise CASError, "There is a CAS conflict, but DB does not yield a document" unless other_doc
+ raise CASError, "There is a CAS conflict, but no :cas_conflict handler has been defined. See 'on' directive." unless self.class.event_listeners[:cas_conflict]
+
+ # resolve conflict
+ other_doc = self.class.symbolize(other_doc)
+ @data = self.class.event_listeners[:cas_conflict].call(@original, other_doc, to_save)
+ @cas = newcas
+
+ rtcount -= 1
+ raise CASError, "Exhausted retries" if rtcount == 0
+ retry
+
+ end
+ end
+
+
+ # Get the final hash that will be saved to the database.
+ # Check if data has been modified.
+ #
+ def to_save
+ hash = @data.hash
+ return @tos if @tos && @toshash && @toshash == hash
+
+ @tos = @data.merge({
+ :_type => self.class.doc_type
+ })
+ @toshash = @data.hash
+ @tos
+ end
+
+
def self.doc_type
@type ||= DEFAULT_TYPE
end
@@ -315,9 +390,14 @@
end
def self.secondary_dbs
@secondary_dbs ||= []
+ end
+
+
+ def self.event_listeners
+ @event_listeners ||= {}
end
def self.symbolize hash
hash.inject({}) do |memo,(k,v)|