lib/sohm.rb in sohm-0.10.1 vs lib/sohm.rb in sohm-0.10.2
- old
+ new
@@ -106,10 +106,23 @@
def self.mutex
@mutex
end
+ # If this is true, Sohm::Model#save would refresh all indices inline
+ # when called, this might hurt performance in certain cases. You can
+ # use this flag here to disable this inline refreshing, then use external
+ # background workers to refresh indices by calling Sohm::Model#refresh_indices
+ @refresh_indices_inline = true
+ def self.refresh_indices_inline=(flag)
+ @refresh_indices_inline = flag
+ end
+
+ def self.refresh_indices_inline
+ @refresh_indices_inline
+ end
+
# By default, EVALSHA is used
def self.enable_evalsha
defined?(@enable_evalsha) ? @enable_evalsha : true
end
@@ -1058,11 +1071,12 @@
#
def save
if serial_attributes_changed
response = script(LUA_SAVE, 1, key,
sanitize_attributes(serial_attributes).to_msgpack,
- cas_token)
+ cas_token,
+ sanitize_attributes(attributes).to_msgpack)
if response.is_a?(RuntimeError)
if response.message =~ /cas_error/
raise CasViolation
else
@@ -1070,20 +1084,66 @@
end
end
@cas_token = response
@serial_attributes_changed = false
+ else
+ redis.call("HSET", key, "_ndata",
+ sanitize_attributes(attributes).to_msgpack)
end
- redis.call("HSET", key, "_ndata",
- sanitize_attributes(attributes).to_msgpack)
+ if Sohm.refresh_indices_inline
+ refresh_indices
+ end
- refresh_indices
-
return self
end
+ # Refresh model indices
+ def refresh_indices
+ memo_key = key["_indices"]
+ # Add new indices first
+ commands = fetch_indices.each_pair.map do |field, vals|
+ vals.map do |val|
+ index_key = model.key["_indices"][field][val]
+ [["SADD", memo_key, index_key], ["SADD", index_key, id]]
+ end
+ end.flatten(2)
+
+ model.synchronize do
+ commands.each do |command|
+ redis.queue(*command)
+ end
+ redis.commit
+ end
+
+ # Remove old indices
+ index_set = ::Set.new(redis.call("SMEMBERS", memo_key))
+ # Here we are fetching the latest model to avoid concurrency issue
+ valid_list = model[id].send(:fetch_indices).each_pair.map do |field, vals|
+ vals.map do |val|
+ model.key["_indices"][field][val]
+ end
+ end.flatten(1)
+ valid_set = ::Set.new(valid_list)
+ diff_set = index_set - valid_set
+ if diff_set.size > 0
+ diff_list = diff_set.to_a
+ commands = diff_list.map do |key|
+ ["SREM", key, id]
+ end + [["SREM", memo_key] + diff_list]
+
+ model.synchronize do
+ commands.each do |command|
+ redis.queue(*command)
+ end
+ redis.commit
+ end
+ end
+ true
+ end
+
# Delete the model, including all the following keys:
#
# - <Model>:<id>
# - <Model>:<id>:_counters
# - <Model>:<id>:<set name>
@@ -1211,51 +1271,9 @@
def fetch_indices
indices = {}
model.indices.each { |field| indices[field] = Array(send(field)) }
indices
- end
-
- # This is performed asynchronously
- def refresh_indices
- memo_key = key["_indices"]
- # Add new indices first
- commands = fetch_indices.each_pair.map do |field, vals|
- vals.map do |val|
- index_key = model.key["_indices"][field][val]
- [["SADD", memo_key, index_key], ["SADD", index_key, id]]
- end
- end.flatten(2)
-
- # TODO: Think about switching to a redis pool later
- model.synchronize do
- commands.each do |command|
- redis.queue(*command)
- end
- redis.commit
- end
-
- # Remove old indices
- # TODO: we can do this asynchronously, or maybe in a background queue
- index_set = ::Set.new(redis.call("SMEMBERS", memo_key))
- valid_list = model[id].send(:fetch_indices).each_pair.map do |field, vals|
- vals.map do |val|
- model.key["_indices"][field][val]
- end
- end.flatten(1)
- valid_set = ::Set.new(valid_list)
- diff_set = index_set - valid_set
- diff_list = diff_set.to_a
- commands = diff_list.map do |key|
- ["SREM", key, id]
- end + [["SREM", memo_key] + diff_list]
-
- model.synchronize do
- commands.each do |command|
- redis.queue(*command)
- end
- redis.commit
- end
end
# Unpack hash returned by redis, which contains _cas, _sdata, _ndata
# columns
def unpack_attrs(attrs)