lib/juggernaut/client.rb in juggernaut-0.5.7 vs lib/juggernaut/client.rb in juggernaut-0.5.8
- old
+ new
@@ -1,157 +1,263 @@
+require 'timeout'
+require 'net/http'
+require 'net/https'
require 'uri'
+
module Juggernaut
class Client
include Juggernaut::Miscel
-
- attr_reader :id
- attr_accessor :session_id
- attr_reader :connections
- @@clients = []
- class << self
- # Actually does a find_or_create_by_id
- def find_or_create(subscriber, request)
- if client = find_by_id(request[:client_id])
- client.session_id = request[:session_id]
- client.add_new_connection(subscriber)
- client
- else
- self.new(subscriber, request)
- end
- end
+ @@clients = [ ]
- def add_client(client)
- @@clients << client unless @@clients.include?(client)
- end
+ attr_reader :id
+ attr_accessor :session_id
+ attr_reader :connections
- # Client find methods
+ class << self
+ # Actually does a find_or_create_by_id
+ def find_or_create(subscriber, request)
+ if client = find_by_id(request[:client_id])
+ client.session_id = request[:session_id]
+ client.add_new_connection(subscriber)
+ client
+ else
+ self.new(subscriber, request)
+ end
+ end
- def find_all
- @@clients
- end
+ # Client find methods
+ def find_all
+ @@clients
+ end
- def find(&block)
- @@clients.select(&block).uniq
- end
+ def find(&block)
+ @@clients.select(&block).uniq
+ end
- def find_by_id(id)
- find {|client| client.id == id }.first
- end
+ def find_by_id(id)
+ find { |client| client.id == id }.first
+ end
- def find_by_signature(signature)
- # signature should be unique
- find {|client|
- client.connections.select {|connection| connection.signature == signature }.any?
- }.first
- end
+ def find_by_signature(signature)
+ # signature should be unique
+ find do |client|
+ client.connections.select { |connection| connection.signature == signature }.any?
+ end.first
+ end
- def find_by_channels(channels)
- find {|client|
- client.has_channels?(channels)
- }
- end
+ def find_by_channels(channels)
+ find do |client|
+ client.has_channels?(channels)
+ end
+ end
- def find_by_id_and_channels(id, channels)
- find {|client|
- client.has_channels?(channels) && client.id == id
- }.first
- end
+ def find_by_id_and_channels(id, channels)
+ find do |client|
+ client.has_channels?(channels) && client.id == id
+ end.first
+ end
- def send_logouts_after_timeout
- @@clients.each do |client|
- client.logout_request if !client.alive? and client.give_up? and !client.sent_logout?
- end
- end
+ def send_logouts_after_timeout
+ @@clients.each do |client|
+ if client.give_up?
+ client.logout_request
+ end
+ end
+ end
- # Called when the server is shutting down
- def send_logouts_to_all_clients
- @@clients.each do |client|
- client.logout_request if !client.sent_logout?
- end
- end
- end
+ # Called when the server is shutting down
+ def send_logouts_to_all_clients
+ @@clients.each do |client|
+ client.logout_request
+ end
+ end
- def initialize(subscriber, request)
- @connections = []
- @id = request[:client_id]
- @session_id = request[:session_id]
- add_new_connection(subscriber)
- end
+ def reset!
+ @@clients.clear
+ end
- def to_json
- {
- :id => @id,
- :session_id => @session_id
- }.to_json
- end
+ def register_client(client)
+ @@clients << client unless @@clients.include?(client)
+ end
- def add_new_connection(subscriber)
- @connections << subscriber
- end
+ def client_registered?(client)
+ @@clients.include?(client)
+ end
- def subscription_request(channels)
- return true unless options[:subscription_url]
- post_request(options[:subscription_url], channels)
- end
+ def unregister_client(client)
+ @@clients.delete(client)
+ end
+ end
- def logout_connection_request(channels)
- return true unless options[:logout_connection_url]
- post_request(options[:logout_connection_url], channels)
- end
+ def initialize(subscriber, request)
+ @connections = []
+ @id = request[:client_id]
+ @session_id = request[:session_id]
+ @messages = []
+ @logout_timeout = 0
+ self.register
+ add_new_connection(subscriber)
+ end
- def logout_request
- return true unless options[:logout_url]
- @sent_logout = true
- post_request(options[:logout_url])
- end
+ def to_json
+ {
+ :client_id => @id,
+ :num_connections => @connections.size,
+ :session_id => @session_id
+ }.to_json
+ end
- def sent_logout?
- !!@sent_logout
- end
+ def add_new_connection(subscriber)
+ @connections << subscriber
+ end
- def send_message(msg, channels = nil)
- @connections.each do |em|
- em.broadcast(msg) if !channels or channels.empty? or em.has_channels?(channels)
- end
- end
+ def friendly_id
+ if self.id
+ "with ID #{self.id}"
+ else
+ "session #{self.session_id}"
+ end
+ end
- def has_channels?(channels)
- @connections.each do |em|
- return true if em.has_channels?(channels)
- end
- false
- end
+ def subscription_request(channels)
+ return true unless options[:subscription_url]
+ post_request(options[:subscription_url], channels, :timeout => options[:post_request_timeout] || 5)
+ end
- def remove_channels!(channels)
- @connections.each do |em|
- em.remove_channels!(channels)
- end
- end
+ def logout_connection_request(channels)
+ return true unless options[:logout_connection_url]
+ post_request(options[:logout_connection_url], channels, :timeout => options[:post_request_timeout] || 5)
+ end
- def alive?
- @connections.select{|em| em.alive? }.any?
- end
+ def logout_request
+ self.unregister
+ logger.debug("Timed out client #{friendly_id}")
+ return true unless options[:logout_url]
+ post_request(options[:logout_url], [ ], :timeout => options[:post_request_timeout] || 5)
+ end
+
+ def remove_connection(connection)
+ @connections.delete(connection)
+ self.reset_logout_timeout!
+ self.logout_request if self.give_up?
+ end
- def give_up?
- @connections.select {|em| em.logout_timeout and Time.now > em.logout_timeout }.any?
- end
+ def send_message(msg, channels = nil)
+ store_message(msg, channels) if options[:store_messages]
+ send_message_to_connections(msg, channels)
+ end
- private
+ # Send messages that are queued up for this particular client.
+ # Messages are only queued for previously-connected clients.
+ def send_queued_messages(connection)
+ self.expire_queued_messages!
- def post_request(url, channels = [])
- url = URI.parse(url)
- params = []
- params << "client_id=#{id}" if id
- params << "session_id=#{session_id}" if session_id
- channels.each {|chan| params << "channels[]=#{chan}" }
- url.query = params.join('&')
- begin
- open(url.to_s, "User-Agent" => "Ruby/#{RUBY_VERSION}")
- rescue => e
- logger.debug("Bad response from #{url.to_s}: #{e}")
- return false
- end
- true
- end
- end
- end
\ No newline at end of file
+ # Weird looping because we don't want to send messages that get
+ # added to the array after we start iterating (since those will
+ # get sent to the client anyway).
+ @length = @messages.length
+
+ logger.debug("Sending #{@length} queued message(s) to client #{friendly_id}")
+
+ @length.times do |id|
+ message = @messages[id]
+ send_message_to_connection(connection, message[:message], message[:channels])
+ end
+ end
+
+ def channels
+ @connections.collect { |em| em.channels }.flatten.uniq
+ end
+
+ def has_channels?(channels)
+ @connections.each do |em|
+ return true if em.has_channels?(channels)
+ end
+ false
+ end
+
+ def remove_channels!(channels)
+ @connections.each do |em|
+ em.remove_channels!(channels)
+ end
+ end
+
+ def alive?
+ @connections.select { |em| em.alive? }.any?
+ end
+
+ # This client is only dead if there are no connections and we are
+ # past the timeout (if we are within the timeout, the user could
+ # just be doing a page reload or going to a new page)
+ def give_up?
+ !alive? and (Time.now > @logout_timeout)
+ end
+
+ protected
+
+ def register
+ self.class.register_client(self)
+ end
+
+ def registered?
+ self.class.client_registered?(self)
+ end
+
+ def unregister
+ self.class.unregister_client(self)
+ end
+
+ def post_request(url, channels = [ ], options = { })
+ uri = URI.parse(url)
+ uri.path = '/' if uri.path == ''
+ params = []
+ params << "client_id=#{id}" if id
+ params << "session_id=#{session_id}" if session_id
+ channels.each {|chan| params << "channels[]=#{chan}" }
+ headers = {"User-Agent" => "Ruby/#{RUBY_VERSION}"}
+ begin
+ http = Net::HTTP.new(uri.host, uri.port)
+ if uri.scheme == 'https'
+ http.use_ssl = true
+ http.verify_mode = OpenSSL::SSL::VERIFY_NONE
+ end
+ http.read_timeout = options[:timeout] || 5
+ resp, data = http.post(uri.path, params.join('&'), headers)
+ return resp.is_a?(Net::HTTPOK)
+ rescue => e
+ logger.error("Bad request #{url.to_s} (#{e.class}: #{e.message})")
+ return false
+ rescue Timeout::Error
+ logger.error("#{url.to_s} timeout")
+ return false
+ end
+ end
+
+ def send_message_to_connections(msg, channels)
+ @connections.each do |connection|
+ send_message_to_connection(connection, msg, channels)
+ end
+ end
+
+ def send_message_to_connection(connection, msg, channels)
+ connection.broadcast(msg) if !channels or channels.empty? or connection.has_channels?(channels)
+ end
+
+ # Queued messages are stored until a timeout is reached which is the
+ # same as the connection timeout. This takes care of messages that
+ # come in between page loads or ones that come in right when you are
+ # clicking off one page and loading the next one.
+ def store_message(msg, channels)
+ self.expire_queued_messages!
+ @messages << { :channels => channels, :message => msg, :timeout => Time.now + options[:timeout] }
+ end
+
+ def expire_queued_messages!
+ @messages.reject! { |message| Time.now > message[:timeout] }
+ end
+
+ def reset_logout_timeout!
+ @logout_timeout = Time.now + options[:timeout]
+ end
+ end
+end