lib/plezi/handlers/ws_identity.rb in plezi-0.12.14 vs lib/plezi/handlers/ws_identity.rb in plezi-0.12.15

- old
+ new

@@ -37,10 +37,22 @@ @cache[key] ||= [] @cache[key].unshift value @cache[key].count end end + def lpop key + sync do + @cache[key] ||= [] + @cache[key].shift + end + end + def lrem key, count, value + sync do + @cache[key] ||= [] + @cache[key].delete(value) + end + end def rpush key, value sync do @cache[key] ||= [] @cache[key].push value @cache[key].count @@ -93,12 +105,11 @@ protected # @!visibility public # The following method registers the connections as a unique global identity. # - # Like {Plezi::Base::WSObject::SuperClassMethods#notify}, using this method requires an active Redis connection - # to be set up. See {Plezi#redis} for more information. + # The Identity API works best when a Redis server is used. See {Plezi#redis} for more information. # # By default, only one connection at a time can respond to identity events. If the same identity # connects more than once, only the last connection will receive the notifications. # This default may be controlled by setting the `:max_connections` option to a number greater than 1. # @@ -108,33 +119,45 @@ # # The option's Hash, at the moment, accepts only the following (optional) options: # lifetime:: sets how long the identity can survive. defaults to `604_800` seconds (7 days). # max_connections:: sets the amount of concurrent connections an identity can have (akin to open browser tabs receiving notifications). defaults to 1 (a single connection). # + # Lifetimes are renewed with each registration and when a connected Identoty receives a notification. + # Identities should have a reasonable lifetime. For example, a 10 minutes long lifetime (60*10) + # may prove ineffective. When using such short lifetimes, consider the possibility that `unicast` might provide be a better alternative. + # + # A lifetime cannot (by design) be shorter than 10 minutes. + # # Calling this method will also initiate any events waiting in the identity's queue. # make sure that the method is only called once all other initialization is complete. # + # i.e. + # + # register_as session.id, lifetime: 60*60*24, max_connections: 4 + # # Do NOT call this method asynchronously unless Plezi is set to run as in a single threaded mode - doing so # will execute any pending events outside the scope of the IO's mutex lock, thus introducing race conditions. def register_as identity, options = {} redis = Plezi.redis || ::Plezi::Base::WSObject::RedisEmultaion options[:max_connections] ||= 1 options[:max_connections] = 1 if options[:max_connections].to_i < 1 options[:lifetime] ||= 604_800 + options[:lifetime] = 600 if options[:lifetime].to_i < 600 identity = identity.to_s.freeze - @___identity ||= [].to_set - @___identity << identity - redis.pipelined do + @___identity ||= {} + @___identity[identity] = options + redis.multi do + redis.lpop(identity) + redis.lpush(identity, ''.freeze) + redis.lrem "#{identity}_uuid".freeze, 0, uuid redis.lpush "#{identity}_uuid".freeze, uuid redis.ltrim "#{identity}_uuid".freeze, 0, (options[:max_connections]-1) - end - redis.lpush(identity, ''.freeze) unless redis.llen(identity) > 0 - ___review_identity identity - redis.pipelined do redis.expire identity, options[:lifetime] redis.expire "#{identity}_uuid".freeze, options[:lifetime] end + ___review_identity identity + identity end # @!visibility public # sends a notification to an Identity. Returns false if the Identity never registered or it's registration expired. def notify identity, event_name, *args @@ -151,25 +174,48 @@ module SuperInstanceMethods protected def ___review_identity identity redis = Plezi.redis || ::Plezi::Base::WSObject::RedisEmultaion identity = identity.to_s.freeze - return Iodine.warn("Identity message reached wrong target (ignored).").clear unless @___identity.include?(identity) + return Iodine.warn("Identity message reached wrong target (ignored).").clear unless @___identity[identity] messages = redis.multi do redis.lrange identity, 1, -1 redis.ltrim identity, 0, 0 + redis.expire identity, @___identity[identity][:lifetime] + redis.expire "#{identity}_uuid".freeze, @___identity[identity][:lifetime] end[0] targets = redis.lrange "#{identity}_uuid", 0, -1 + targets.delete(uuid) while msg = messages.shift msg = ::Plezi::Base::WSObject.translate_message(msg) next unless msg Iodine.error("Notification recieved but no method can handle it - dump:\r\n #{msg.to_s}") && next unless self.class.has_super_method?(msg[:method]) - # targets.each {|target| target == uuid ? self.method(msg[:method]).call(*msg[:data]) : unicast(target, msg[:method], *msg[:data])} - targets.each {|target| unicast(target, msg[:method], *msg[:data])} # this allows for async execution - + Iodine.run do + targets.each {|target| unicast(target, msg[:method], *msg[:data]) } + end + self.method(msg[:method]).call(*msg[:data]) end + # ___extend_lifetime identity end + + # # re-registers the Identity, extending it's lifetime + # # and making sure it's still valid. + # def ___extend_lifetime identity + # return unless @___identity + # redis = Plezi.redis || ::Plezi::Base::WSObject::RedisEmultaion + # options = @___identity[identity] + # return unless options + # redis.multi do + # # redis.lpop(identity) + # # redis.lpush(identity, ''.freeze) + # # redis.lrem "#{identity}_uuid".freeze, 0, uuid + # # redis.lpush "#{identity}_uuid".freeze, uuid + # # redis.ltrim "#{identity}_uuid".freeze, 0, (options[:max_connections]-1) + # redis.expire identity, options[:lifetime] + # redis.expire "#{identity}_uuid".freeze, options[:lifetime] + # end + # end end module SuperClassMethods public @@ -178,9 +224,10 @@ redis = Plezi.redis || ::Plezi::Base::WSObject::RedisEmultaion identity = identity.to_s.freeze return false unless redis.llen(identity).to_i > 0 redis.rpush identity, ({method: event_name, data: args}).to_yaml redis.lrange("#{identity}_uuid".freeze, 0, -1).each {|target| unicast target, :___review_identity, identity } + # puts "pushed notification #{event_name}" true end # returns true if the Identity in question is registered to receive notifications. def registered? identity