lib/plezi/handlers/ws_identity.rb in plezi-0.12.6 vs lib/plezi/handlers/ws_identity.rb in plezi-0.12.7

- old
+ new

@@ -2,10 +2,91 @@ module Base module WSObject + module RedisEmultaion + public + def lrange key, first, last = -1 + sync do + return [] unless @cache[key] + @cache[key][first..last] || [] + end + end + def llen key + sync do + return 0 unless @cache[key] + @cache[key].count + end + end + def ltrim key, first, last = -1 + sync do + return "OK".freeze unless @cache[key] + @cache[key] = @cache[key][first..last] + "OK".freeze + end + end + def del *keys + sync do + ret = 0 + keys.each {|k| ret += 1 if @cache.delete k } + ret + end + end + def lpush key, value + sync do + @cache[key] ||= [] + @cache[key].unshift value + @cache[key].count + end + end + def rpush key, value + sync do + @cache[key] ||= [] + @cache[key].push value + @cache[key].count + end + end + def expire key, seconds + Iodine.warn "Identity API requires Redis - no persistent storage!" + sync do + return 0 unless @cache[key] + if @timers[key] + @timers[key].stop! + end + @timers[key] = (Iodine.run_after(seconds) { self.del key }) + end + end + def multi + sync do + @results = [] + yield(self) + ret = @results + @results = nil + ret + end + end + alias :pipelined :multi + protected + @locker = Mutex.new + @cache = Hash.new + @timers = Hash.new + + def sync &block + if @locker.locked? && @locker.owned? + ret = yield + @results << ret if @results + ret + else + @locker.synchronize { sync &block } + end + end + + public + extend self + end + # the following are additions to the WebSocket Object module, # to establish identity to websocket realtionships, allowing for a # websocket message bank. module InstanceMethods @@ -14,40 +95,44 @@ # The following method registers the connections as a unique global identity. # # Like {Plezi::Base::WSObject::SuperClassMethods#notify}, using this method requires an active Redis connection # to be set up. See {Plezi#redis} for more information. # - # Only one connection at a time can respond to identity events. If the same identity + # By default, only one connection at a time can respond to identity events. If the same identity # connects more than once, only the last connection will receive the notifications. + # This default may be controlled by setting the `:max_connections` option to a number greater than 1. # # The method accepts: # identity:: a global application wide unique identifier that will persist throughout all of the identity's connections. # options:: an option's hash that sets the properties of the identity. # - # The option's Hash, at the moment, accepts only the following (optional) option: + # The option's Hash, at the moment, accepts only the following (optional) options: # lifetime:: sets how long the identity can survive. defaults to `604_800` seconds (7 days). + # max_connections:: sets the amount of concurrent connections an identity can have (akin to open browser tabs receiving notifications). defaults to 1 (a single connection). # # Calling this method will also initiate any events waiting in the identity's queue. # make sure that the method is only called once all other initialization is complete. # # Do NOT call this method asynchronously unless Plezi is set to run as in a single threaded mode - doing so # will execute any pending events outside the scope of the IO's mutex lock, thus introducing race conditions. def register_as identity, options = {} - redis = Plezi.redis - raise "The identity API requires a Redis connection" unless redis + redis = Plezi.redis || ::Plezi::Base::WSObject::RedisEmultaion + options[:max_connections] ||= 1 + options[:max_connections] = 1 if options[:max_connections].to_i < 1 + options[:lifetime] ||= 604_800 identity = identity.to_s.freeze @___identity ||= [].to_set @___identity << identity redis.pipelined do redis.lpush "#{identity}_uuid".freeze, uuid - redis.ltrim "#{identity}_uuid".freeze, 0, 0 + redis.ltrim "#{identity}_uuid".freeze, 0, (options[:max_connections]-1) end - ___review_identity identity redis.lpush(identity, ''.freeze) unless redis.llen(identity) > 0 + ___review_identity identity redis.pipelined do - redis.expire identity, (options[:lifetime] || 604_800) - redis.expire "#{identity}_uuid".freeze, (options[:lifetime] || 604_800) + redis.expire identity, options[:lifetime] + redis.expire "#{identity}_uuid".freeze, options[:lifetime] end end # sends a notification to an Identity. Returns false if the Identity never registered or it's registration expired. def notify identity, event_name, *args @@ -61,47 +146,44 @@ module ClassMethods end module SuperInstanceMethods protected def ___review_identity identity - redis = Plezi.redis - raise "unknown Redis initiation error" unless redis + redis = Plezi.redis || ::Plezi::Base::WSObject::RedisEmultaion identity = identity.to_s.freeze return Iodine.warn("Identity message reached wrong target (ignored).").clear unless @___identity.include?(identity) - redis.multi do - redis.lpush identity, ''.freeze - redis.lpush identity, ''.freeze - end - msg = redis.rpop(identity) - Iodine.error "Unknown Identity Queue error - both messages and identity might be lost!\nExpected no data, but got: #{msg}" unless msg == ''.freeze - while (msg = redis.rpop(identity)) && msg != ''.freeze + messages = redis.multi do + redis.lrange identity, 1, -1 + redis.ltrim identity, 0, 0 + end[0] + targets = redis.lrange "#{identity}_uuid", 0, -1 + while msg = messages.shift msg = ::Plezi::Base::WSObject.translate_message(msg) next unless msg Iodine.error("Notification recieved but no method can handle it - dump:\r\n #{msg.to_s}") && next unless self.class.has_super_method?(msg[:method]) - self.method(msg[:method]).call *msg[:data] + # targets.each {|target| target == uuid ? self.method(msg[:method]).call(*msg[:data]) : unicast(target, msg[:method], *msg[:data])} + targets.each {|target| unicast(target, msg[:method], *msg[:data])} # this allows for async execution + end end end module SuperClassMethods public # sends a notification to an Identity. Returns false if the Identity never registered or it's registration expired. def notify identity, event_name, *args - redis = Plezi.redis - raise "The identity API requires a Redis connection" unless redis + redis = Plezi.redis || ::Plezi::Base::WSObject::RedisEmultaion identity = identity.to_s.freeze return false unless redis.llen(identity).to_i > 0 - redis.lpush identity, ({method: event_name, data: args}).to_yaml - target_uuid = redis.lindex "#{identity}_uuid".freeze, 0 - unicast target_uuid, :___review_identity, identity if target_uuid + redis.rpush identity, ({method: event_name, data: args}).to_yaml + redis.lrange("#{identity}_uuid".freeze, 0, -1).each {|target| unicast target, :___review_identity, identity } true end # returns true if the Identity in question is registered to receive notifications. def registered? identity - redis = Plezi.redis - return Iodine.warn("Cannot check for Identity registration without a Redis connection (silent).") && false unless redis + redis = Plezi.redis || ::Plezi::Base::WSObject::RedisEmultaion identity = identity.to_s.freeze redis.llen(identity).to_i > 0 end end end