lib/sohm.rb in sohm-0.10.1 vs lib/sohm.rb in sohm-0.10.2

- old
+ new

@@ -106,10 +106,23 @@ def self.mutex @mutex end + # If this is true, Sohm::Model#save would refresh all indices inline + # when called, this might hurt performance in certain cases. You can + # use this flag here to disable this inline refreshing, then use external + # background workers to refresh indices by calling Sohm::Model#refresh_indices + @refresh_indices_inline = true + def self.refresh_indices_inline=(flag) + @refresh_indices_inline = flag + end + + def self.refresh_indices_inline + @refresh_indices_inline + end + # By default, EVALSHA is used def self.enable_evalsha defined?(@enable_evalsha) ? @enable_evalsha : true end @@ -1058,11 +1071,12 @@ # def save if serial_attributes_changed response = script(LUA_SAVE, 1, key, sanitize_attributes(serial_attributes).to_msgpack, - cas_token) + cas_token, + sanitize_attributes(attributes).to_msgpack) if response.is_a?(RuntimeError) if response.message =~ /cas_error/ raise CasViolation else @@ -1070,20 +1084,66 @@ end end @cas_token = response @serial_attributes_changed = false + else + redis.call("HSET", key, "_ndata", + sanitize_attributes(attributes).to_msgpack) end - redis.call("HSET", key, "_ndata", - sanitize_attributes(attributes).to_msgpack) + if Sohm.refresh_indices_inline + refresh_indices + end - refresh_indices - return self end + # Refresh model indices + def refresh_indices + memo_key = key["_indices"] + # Add new indices first + commands = fetch_indices.each_pair.map do |field, vals| + vals.map do |val| + index_key = model.key["_indices"][field][val] + [["SADD", memo_key, index_key], ["SADD", index_key, id]] + end + end.flatten(2) + + model.synchronize do + commands.each do |command| + redis.queue(*command) + end + redis.commit + end + + # Remove old indices + index_set = ::Set.new(redis.call("SMEMBERS", memo_key)) + # Here we are fetching the latest model to avoid concurrency issue + valid_list = model[id].send(:fetch_indices).each_pair.map do |field, vals| + vals.map do |val| + model.key["_indices"][field][val] + end + end.flatten(1) + valid_set = ::Set.new(valid_list) + diff_set = index_set - valid_set + if diff_set.size > 0 + diff_list = diff_set.to_a + commands = diff_list.map do |key| + ["SREM", key, id] + end + [["SREM", memo_key] + diff_list] + + model.synchronize do + commands.each do |command| + redis.queue(*command) + end + redis.commit + end + end + true + end + # Delete the model, including all the following keys: # # - <Model>:<id> # - <Model>:<id>:_counters # - <Model>:<id>:<set name> @@ -1211,51 +1271,9 @@ def fetch_indices indices = {} model.indices.each { |field| indices[field] = Array(send(field)) } indices - end - - # This is performed asynchronously - def refresh_indices - memo_key = key["_indices"] - # Add new indices first - commands = fetch_indices.each_pair.map do |field, vals| - vals.map do |val| - index_key = model.key["_indices"][field][val] - [["SADD", memo_key, index_key], ["SADD", index_key, id]] - end - end.flatten(2) - - # TODO: Think about switching to a redis pool later - model.synchronize do - commands.each do |command| - redis.queue(*command) - end - redis.commit - end - - # Remove old indices - # TODO: we can do this asynchronously, or maybe in a background queue - index_set = ::Set.new(redis.call("SMEMBERS", memo_key)) - valid_list = model[id].send(:fetch_indices).each_pair.map do |field, vals| - vals.map do |val| - model.key["_indices"][field][val] - end - end.flatten(1) - valid_set = ::Set.new(valid_list) - diff_set = index_set - valid_set - diff_list = diff_set.to_a - commands = diff_list.map do |key| - ["SREM", key, id] - end + [["SREM", memo_key] + diff_list] - - model.synchronize do - commands.each do |command| - redis.queue(*command) - end - redis.commit - end end # Unpack hash returned by redis, which contains _cas, _sdata, _ndata # columns def unpack_attrs(attrs)