#-- # Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files(the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. # # Made in Japan. #++ require 'redis' require 'rufus-json' require 'ruote/storage/base' require 'ruote/redis/version' module Ruote module Redis # # A Redis storage for ruote. # # The constructor accepts two arguments, the first one is a Redis instance # ( see http://github.com/ezmobius/redis-rb ), the second one is the classic # ruote engine options( see # http://ruote.rubyforge.org/configuration.html#engine ) # # require 'redis' # gem install redis # require 'ruote' # gem install ruote # require 'ruote-redis' # gem install ruote-redis # # engine = Ruote::Engine.new( # Ruote::Worker.new( # Ruote::Redis::RedisStorage.new('db'=> 14, 'thread_safe' => true))) # # # == em-redis # # Not tried, but I guess, that substituting an instance of em-redis for # the redis instance passed to the constructor might work. # http://github.com/madsimian/em-redis # # If you try and it works, feedback is welcome # http://groups.google.com/group/openwferu-users # # # == 'pop_count' option # # By default, when the worker queries this storage for msgs to process, # the storage will try to pop 28 msgs. This number can be changed thanks # to the 'pop_count' option, like in: # # engine = Ruote::Engine.new( # Ruote::Worker.new( # Ruote::Redis::RedisStorage.new( # 'db'=> 14, 'thread_safe' => true, 'pop_count' => 56))) # # Don't put too high a number, it increases the chance of msgs getting lost # in case of the worker going down. # # (if there is a need to avoid such a scenario in the future, # Redis' rpoplpush might come in handy). # class Storage include Ruote::StorageBase attr_reader :redis # Listing the redis options to differentiate them from ruote storage # options. # REDIS_OPTIONS = %w[ host port path db thread_safe logger ] # A Redis storage for ruote. # # Can be initialized in two ways # # Ruote::Redis::Storage.new( # ::Redis.new( # :host => '127.0.0.1', # :db => 13, # :thread_safe => true)) # # or # # Ruote::Redis::Storage.new( # 'host' => '127.0.0.1', # 'db' => 13, # 'thread_safe' => true) # # The first style is probably better avoided. # def initialize(redis, options={}) if options == {} && redis.is_a?(Hash) redis_options, options = redis.partition { |k, v| REDIS_OPTIONS.include?(k.to_s) } redis_options = Hash[redis_options.collect { |k, v| [ k.to_sym, v ] }] options = Hash[options] redis = ::Redis.new(redis_options) end @redis = redis @options = options @pop_count = @options['pop_count'] || 28 # Returns an array of the (String) keys that match the given pattern. # # Returns an empty array if anything goes wrong. # def @redis.keys_to_a(pattern) if (a = (keys(pattern) rescue nil)).is_a?(Array) a else [] end end replace_engine_configuration(options) end # Returns true if the doc is successfully deleted. # def reserve(doc) return true if doc['type'] == 'msgs' (@redis.del(key_for(doc)) == 1) end def put_msg(action, options) doc = prepare_msg_doc(action, options) doc['put_at'] = Ruote.now_to_utc_s @redis.lpush('msgs', Rufus::Json.encode(doc)) nil end # Note: the worker argument is not used in this storage implementation. # def get_msgs @redis.pipelined { @pop_count.times { @redis.rpop('msgs') } }.compact.collect { |d| from_json(d) } end def put_schedule(flavour, owner_fei, s, msg) doc = prepare_schedule_doc(flavour, owner_fei, s, msg) return nil unless doc doc['_rev'] = '0' doc['put_at'] = Ruote.now_to_utc_s @redis.set(key_for(doc), Rufus::Json.encode(doc)) doc['_id'] end def delete_schedule(schedule_id) return unless schedule_id @redis.del(key_for('schedules', schedule_id)) end def put(doc, opts={}) key = key_for(doc) rev = doc['_rev'] lock(key) do current_doc = do_get(key) current_rev = current_doc ? current_doc['_rev'] : nil if current_rev && rev != current_rev # # version in storage is newer than version being put, # (eturn version in storage) # current_doc elsif rev && current_rev.nil? # # document deleted, put fails (return true) # true else # # put is successful (return nil) # doc = doc.send( opts[:update_rev] ? :merge! : :merge, { '_rev' => (rev.to_i + 1).to_s, 'put_at' => Ruote.now_to_utc_s }) @redis.set(key, Rufus::Json.encode(doc)) nil end end end def get(type, key) do_get(key_for(type, key)) end def delete(doc) rev = doc['_rev'] raise ArgumentError.new("can't delete doc without _rev") unless rev key = key_for(doc) lock(key) do current_doc = do_get(key) if current_doc.nil? # # document is [already] gone, delete fails (return true) # true elsif current_doc['_rev'] != rev # # version in storage doesn't match version to delete # (return version in storage) # current_doc else # # delete is successful (return nil) # @redis.del(key) nil end end end def get_many(type, key=nil, opts={}) keys = key ? Array(key) : nil #ids = if type == 'msgs' || type == 'schedules' # @redis.keys_to_a("#{type}/*") ids = if keys == nil @redis.keys_to_a("#{type}/*") elsif keys.first.is_a?(String) keys.collect { |k| @redis.keys_to_a("#{type}/*!#{k}#{type == 'schedules' ? '-*' : ''}") }.flatten else #if keys.first.is_a?(Regexp) @redis.keys_to_a("#{type}/*").select { |i| i = i[type.length + 1..-1] # removing "^type/" keys.find { |k| k.match(i) } } end ids = ids.reject { |i| i.match(LOCK_KEY) } ids = ids.sort ids = ids.reverse if opts[:descending] skip = opts[:skip] || 0 limit = opts[:limit] || ids.length ids = ids[skip, limit] docs = ids.length > 0 && @redis.mget(*ids) docs = docs.is_a?(Array) ? docs : [] docs = docs.each_with_object({}) do |doc, h| next unless doc doc = Rufus::Json.decode(doc) h[doc['_id']] = doc end return docs.size if opts[:count] docs = docs.values.sort_by { |d| d['_id'] } opts[:descending] ? docs.reverse : docs end def ids(type) @redis.keys_to_a("#{type}/*").reject { |i| i.match(LOCK_KEY) }.collect { |i| i.split('/').last }.sort end def purge! 2.times { @redis.flushdb rescue nil } # 2 times to work around Redis::ProtocolError '3' end # Shuts this worker down. # # (This close / shutdown dichotomy has to be resolved at some point...) # def close @redis.quit end # Shuts this worker down. # # (This close / shutdown dichotomy has to be resolved at some point...) # def shutdown @redis.quit end # Mainly used by ruote's test/unit/ut_17_storage.rb # def add_type(type) # nothing to be done end # Nukes a db type and reputs it(losing all the documents that were in it). # def purge_type!(type) @redis.keys_to_a("#{type}/*").each { |k| (@redis.del(k) rescue nil) } end # Simply calls @redis.reconnect # def reconnect @redis.reconnect end protected LOCK_KEY = /-lock$/ # A locking mecha. # # Mostly inspired from http://code.google.com/p/redis/wiki/SetnxCommand # def lock(key, &block) kl = "#{key}-lock" loop do break if @redis.setnx(kl, Time.now.to_f.to_s) != false # locking successful # # already locked t = @redis.get(kl) @redis.del(kl) if t && Time.now.to_f - t.to_f > 60.0 # after 1 minute, locks time out sleep 0.007 # let's try to lock again after a while end #@redis.expire(kl, 2) # this doesn't work, it makes the next call to setnx succeed result = block.call @redis.del(kl) result end # key_for(doc) # key_for(type, key) # def key_for(*args) a = args.first (a.is_a?(Hash) ? [ a['type'], a['_id'] ] : args[0, 2]).join('/') end def do_get(key) from_json(@redis.get(key)) end def from_json(s) s ? Rufus::Json.decode(s) : nil end end # # Keeping Ruote::Redis::RedisStorage for backward compatibility. # class RedisStorage < Storage end end end