require 'digest/sha1' module Rhoconnect class StoreLockException < RuntimeError; end class Store RESERVED_ATTRIB_NAMES = ["attrib_type", "id"] unless defined? RESERVED_ATTRIB_NAMES @@db = nil class << self def db; @@db || @@db = _get_redis end def db=(server=nil) @@db = _get_redis(server) end def create(server=nil) @@db ||= _get_redis(server) raise "Error connecting to Redis store." unless @@db and (@@db.is_a?(Redis) or @@db.is_a?(Redis::Client)) end def start_transaction @@db.multi end def execute_transaction @@db.exec end def doc_type(dockey) @@db.type(dockey) if dockey end def set_db_doc(dockey, data, append=false) if data.is_a?(String) put_value(dockey, data) else put_data(dockey, data, append) end end def get_db_doc(dockey) doctype = Store.doc_type(dockey) if doctype == 'string' Store.get_value(dockey) elsif doctype == 'list' Store.get_data(dockey, Array).to_json else Store.get_data(dockey).to_json end end def put_object(dockey, key, data={}) _put_object(dockey, key, data) end # Adds set with given data, replaces existing set # if it exists or appends data to the existing set # if append flag set to true def put_data(dockey,data={},append=false) if dockey and data flash_data(dockey) unless append # Inserts a hash or array if data.is_a?Hash @@db.pipelined do data.each do |key,value| raise ArgumentError, "Invalid value object: #{value.inspect}. Hash is expected." unless value.is_a?(Hash) _put_object(dockey, key, value) end end else put_list(dockey,data,append) end end true end def put_list(dockey, data=[], append=false) if dockey and data flash_data(dockey) unless append @@db.pipelined do data.each do |value| @@db.rpush(dockey, value) end end end true end # updates objects for a given doctype, source, user # create new objects if necessary def update_objects(dockey, data={}) return 0 unless dockey and data new_object_count = 0 objs = get_objects(dockey, data.keys) @@db.pipelined do data.each do |key,value| is_create = objs[key].nil? new_object_count += 1 if is_create obj_bucket = _add_bucket_index(dockey, "#{_create_obj_index(key)}") value.each do |attrib,value| next if _is_reserved?(attrib, value) new_element = setelement(key,attrib,value) element_exists = is_create ? false : objs[key].has_key?(attrib) if element_exists existing_element = setelement(key,attrib,objs[key][attrib]) if existing_element != new_element @@db.srem(obj_bucket, existing_element) @@db.sadd(obj_bucket, new_element) end else @@db.sadd(obj_bucket, new_element) end end end end new_object_count end # Removes objects from a given doctype,source,user def delete_objects(dockey,data=[]) return 0 unless dockey and data deleted_object_count = 0 objs = get_objects(dockey, data) @@db.pipelined do data.each do |id| if _delete_object(dockey, id, objs[id]) deleted_object_count += 1 end end end _cleanup_buckets(dockey, objs.keys) deleted_object_count end # Deletes data from a given doctype,source,user def delete_data(dockey,data={}) if dockey and data @@db.pipelined do data.each do |key,value| _delete_object(dockey, key, value) end end _cleanup_buckets(dockey, data.keys) end true end # Adds a simple key/value pair def put_value(dockey,value) if dockey if value @@db.set(dockey,value.to_s) else @@db.del(dockey) end end end # Retrieves value for a given key def get_value(dockey) @@db.get(dockey) if dockey end def incr(dockey) @@db.incr(dockey) end def decr(dockey) @@db.decr(dockey) end def update_count(dockey, count) Store.db.incrby(dockey, count) end def get_object(dockey, key) _get_object(dockey, key) end def get_objects(dockey, keys) res = {} keys.each do |key| res[key] = _get_object(dockey, key) end if keys res end # Retrieves set for given dockey,source,user def get_data(dockey,type=Hash) res = type == Hash ? {} : [] if dockey if type == Hash buckets = _get_buckets(dockey) buckets.each do |bucket| members = @@db.smembers(bucket) members.each do |element| key,attrib,value = getelement(element) res[key] = {} unless res[key] res[key].merge!({attrib => value}) end if members end if buckets else res = get_list(dockey) end end res end def get_list(dockey) res = [] if dockey res = @@db.lrange(dockey, 0, -1) end res end # Retrieves diff data hash between two sets def get_diff_data(src_dockey,dst_dockey,p_size=nil) res = {} if src_dockey and dst_dockey # obtain combined indices indices = @@db.hgetall("#{dst_dockey}:indices") indices.merge!(@@db.hgetall("#{dst_dockey}:indices")) indices.keys.each do |index| dst_bucket_name = "#{dst_dockey}:#{index}" src_bucket_name = "#{src_dockey}:#{index}" @@db.sdiff(dst_bucket_name,src_bucket_name).each do |element| key,attrib,value = getelement(element) res[key] = {} unless res[key] res[key].merge!({attrib => value}) end break if p_size and (res.size >= p_size) end end if p_size diff = {} page_size = p_size res.each do |key,item| diff[key] = item page_size -= 1 break if page_size <= 0 end diff else res end end # Deletes all keys matching a given mask def flash_data(keymask) if keymask.to_s[/[*\[\]?]/] # If the keymask contains any pattern matching characters # Use keys command to find all keys matching pattern (this is extremely expensive) # Then delete matches @@db.keys(keymask).each do |key| _delete_doc(key) end else # The keymask doesn't contain pattern matching characters # A delete call is all that is needed _delete_doc(keymask) end end # Lock a given key and release when provided block is finished def lock(dockey,timeout=0,raise_on_expire=false) m_lock = get_lock(dockey,timeout,raise_on_expire) res = yield release_lock(dockey,m_lock) res end def get_lock(dockey,timeout=0,raise_on_expire=false) lock_key = _lock_key(dockey) current_time = Time.now.to_i ts = current_time+(Rhoconnect.lock_duration || timeout)+1 loop do if not @@db.setnx(lock_key,ts) current_lock = @@db.get(lock_key) # ensure lock wasn't released between the setnx and get calls if current_lock current_lock_timeout = current_lock.to_i if raise_on_expire or Rhoconnect.raise_on_expired_lock if current_lock_timeout <= current_time # lock expired before operation which set it up completed # this process cannot continue without corrupting locked data raise StoreLockException, "Lock \"#{lock_key}\" expired before it was released" end else if current_lock_timeout <= current_time and @@db.getset(lock_key,ts).to_i <= current_time # previous lock expired and we replaced it with our own break end end # lock was released between setnx and get - try to acquire it again elsif @@db.setnx(lock_key,ts) break end sleep(1) current_time = Time.now.to_i else break #no lock was set, so we set ours and leaving end end return ts end # Due to redis bug #140, setnx always returns true so this doesn't work # def get_lock(dockey,timeout=0) # lock_key = _lock_key(dockey) # until @@db.setnx(lock_key,1) do # sleep(1) # end # @@db.expire(lock_key,timeout+1) # Time.now.to_i+timeout+1 # end def release_lock(dockey,lock,raise_on_expire=false) @@db.del(_lock_key(dockey)) if raise_on_expire or Rhoconnect.raise_on_expired_lock or (lock >= Time.now.to_i) end # Create a copy of srckey in dstkey def clone(srckey,dstkey) buckets = _get_bucket_indices(srckey) if buckets.size @@db.pipelined do buckets.each do |bucket_index| _add_bucket_index(dstkey, bucket_index) @@db.sdiffstore("#{dstkey}:#{bucket_index}", "#{srckey}:#{bucket_index}", '') end end else @@db.sdiffstore(dstkey,srckey,'') end end # Rename srckey to dstkey def rename(srckey,dstkey) buckets = _get_bucket_indices(srckey) if buckets.size @@db.pipelined do @@db.del("#{srckey}:indices") buckets.each do |bucket_index| _add_bucket_index(dstkey, bucket_index) @@db.rename("#{srckey}:#{bucket_index}", "#{dstkey}:#{bucket_index}") end end else @@db.rename(srckey,dstkey) if @@db.exists(srckey) end end def put_zdata(dockey,assoc_key,data={},append=false) return true unless (dockey and assoc_key and data) flush_zdata(dockey) unless append current_score = 0 current_score_data = Store.db.zrevrange(dockey,0,0,:with_scores => true) current_score = current_score_data[-1].to_i if current_score_data current_score += 1 data.each do |key,hash_value| unique_record_key = setelement(current_score,assoc_key, key) Store.db.zadd(dockey, current_score, unique_record_key) Store.put_data("#{dockey}:#{unique_record_key}",{key => hash_value}) end true end # Retrieves set for given dockey,associated key (client_id), obj_hashes def get_zdata(dockey) data = Store.db.zrange(dockey, 0, -1) ret = [] keys = [] unless data.nil? scores = [] data.each do |zsetkey| obj_hash = Store.get_data "#{dockey}:#{zsetkey}" score,key,objkey = getelement(zsetkey) if scores[-1] != score ret << obj_hash keys << key scores << score else ret[-1].merge!(obj_hash) end end end [ret, keys] end # Deletes all keys and their hashes from the Redis DB def flush_zdata(dockey) data = Store.db.zrange(dockey, 0, -1) data.each do |hash_key| _delete_doc("#{dockey}:#{hash_key}") end Store.db.zremrangebyrank(dockey, 0, -1) end def exists?(key) @@db.exists(key) || @@db.exists("#{key}:indices") end alias_method :set_value, :put_value alias_method :set_data, :put_data private def _get_redis(server=nil) url = ENV[REDIS_URL] || ENV[REDISTOGO_URL] || nil if url Redis.connect(:url => url) elsif server and server.is_a?(String) host,port,db,password = server.split(':') Redis.new(:thread_safe => true, :host => host, :port => port, :db => db, :password => password) elsif server and server.is_a?(Redis) server else Redis.new(:thread_safe => true) end end def _lock_key(dockey) "lock:#{dockey}" end def _is_reserved?(attrib,value) #:nodoc: RESERVED_ATTRIB_NAMES.include? attrib end # operations with docs that are split into buckets def _delete_doc(dockey) # check if this doc has buckets if(@@db.exists("#{dockey}:indices")) buckets_list = _get_buckets(dockey) # delete all buckets @@db.pipelined do @@db.del("#{dockey}:indices") buckets_list.each do |bucket| @@db.del(bucket) end end end # delete main doc @@db.del(dockey) end # create object's bucket index # using SHA1 hashing def _create_obj_index(key) Digest::SHA1.hexdigest(key)[0..1] end def _add_bucket_index(dockey, bucket_index) bucket_name = "#{dockey}:#{bucket_index}" @@db.hsetnx("#{dockey}:indices", bucket_index, bucket_name) bucket_name end def _remove_bucket_index(dockey, bucket_index) @@db.hdel("#{dockey}:indices", bucket_index) end def _get_bucket_indices(dockey) @@db.hkeys("#{dockey}:indices") end def _get_buckets(dockey) @@db.hvals("#{dockey}:indices") end def _cleanup_buckets(dockey, keys) keys.each do |key| obj_index = _create_obj_index(key) bucket_name = "#{dockey}:#{obj_index}" _remove_bucket_index(dockey, obj_index) unless @@db.exists(bucket_name) end if keys end def _put_object(dockey, key, obj={}) return if obj.empty? or not dockey or not key obj_bucket_index = _create_obj_index(key) bucket_name = "#{dockey}:#{obj_bucket_index}" _add_bucket_index(dockey, obj_bucket_index) @@db.pipelined do obj.each do |attrib, value| unless _is_reserved?(attrib,value) @@db.sadd(bucket_name,setelement(key,attrib,value)) end end end end def _delete_object(dockey, key, obj) return false if not key or not dockey or not obj obj_bucket_index = _create_obj_index(key) bucket_name = "#{dockey}:#{obj_bucket_index}" @@db.pipelined do obj.each do |attrib,value| @@db.srem(bucket_name, setelement(key,attrib,value)) end end true end def _get_object(dockey, key) return nil unless dockey res = nil obj_bucket_index = _create_obj_index(key) bucket_name = "#{dockey}:#{obj_bucket_index}" key_str_size = "#{key}".size members = @@db.smembers(bucket_name) members.each do |element| next unless element[0..key_str_size - 1] == key obj_id,attrib,value = getelement(element) res ||= {} res[attrib] = value end if members res end end end end