require 'eventmachine' require 'socket' require 'json' require 'open-uri' require 'fileutils' require 'digest/sha1' module Juggernaut class Server < EventMachine::Connection include Juggernaut::Miscel class InvalidRequest < Juggernaut::JuggernautError #:nodoc: end class InvalidCommand < Juggernaut::JuggernautError #:nodoc: end class CorruptJSON < Juggernaut::JuggernautError #:nodoc: end class MalformedBroadcast < Juggernaut::JuggernautError #:nodoc: end class MalformedSubscribe < Juggernaut::JuggernautError #:nodoc: end class UnauthorisedSubscription < Juggernaut::JuggernautError #:nodoc: end class MalformedQuery < Juggernaut::JuggernautError #:nodoc: end class UnauthorisedBroadcast < Juggernaut::JuggernautError #:nodoc: end class UnauthorisedQuery < Juggernaut::JuggernautError #:nodoc: end POLICY_FILE = <<-EOF EOF POLICY_REQUEST = "" CR = "\0" attr_reader :current_msg_id attr_reader :messages attr_reader :connected attr_reader :logout_timeout attr_reader :status attr_reader :channels attr_reader :client # EM methods def post_init logger.debug "New client [#{client_ip}]" @client = nil @channels = [] @current_msg_id = 0 @connected = true @logout_timeout = nil @buffer = '' @http_request = false end # Juggernaut packets are terminated with "\0" # so we need to buffer the data until we find the # terminating "\0" def receive_data(data) @buffer << data @buffer = process_whole_messages(@buffer) end def strip_http_if_need(data) return data if data !~ /^POST / out = data.split("\r\n\r\n") if out.size == 2 out = out.last @http_request = true out.strip + "\0" else data end end # process any whole messages in the buffer, # and return the new contents of the buffer def process_whole_messages(data) data = strip_http_if_need(data) return data if data !~ /\0/ # only process if data contains a \0 char messages = data.split("\0") if data =~ /\0$/ data = '' else # remove the last message from the list (because it is incomplete) before processing data = messages.pop end messages.each {|message| process_message(message.strip)} return data end def process_message(ln) logger.debug "Processing message: #{ln}" @request = nil if ln == POLICY_REQUEST logger.debug "Sending crossdomain file" send_data POLICY_FILE.gsub('PORT', (options[:public_port]||options[:port]).to_s) close_connection_after_writing return end begin @request = JSON.parse(ln) unless ln.empty? rescue raise CorruptJSON, ln end raise InvalidRequest, ln if !@request @request.symbolize_keys! # For debugging @request[:ip] = client_ip @request[:channels] = (@request[:channels] || []).compact.select {|c| !!c && c != '' }.uniq if @request[:client_ids] @request[:client_ids] = @request[:client_ids].to_a.compact.select {|c| !!c && c != '' }.uniq end case @request[:command].to_sym when :broadcast: broadcast_command when :subscribe: subscribe_command when :query: query_command when :noop: noop_command else raise InvalidCommand, @request end rescue JuggernautError => e logger.error("#{e} - #{e.message.inspect}") close_connection # So as to stop em quitting rescue => e logger ? logger.error(e) : puts(e) end def unbind if @client # todo - should be called after timeout? @client.logout_connection_request(@channels) logger.debug "Lost client #{@client.friendly_id}" end mark_dead('Unbind called') end def add_http_response_headers(msg) h = [] h << "HTTP/1.1 200 OK" h << "Cache-Control: private, max-age=0, must-revalidate" h << "Content-Type: application/javascript" h << "Connection: keep-alive" h << "Content-Length: #{msg.to_s.size}" h = h.join("\r\n") h + "\r\n\r\n" + msg.to_s + "\r\n" end # As far as I'm aware, send_data # never throws an exception def publish(msg) logger.debug "Sending msg: #{msg.to_s} to client #{@request[:client_id]} (session #{@request[:session_id]})" if @http_request msg = add_http_response_headers(msg) send_data(msg) close_connection_after_writing else send_data(msg.to_s + CR) end end # Connection methods def broadcast(bdy, timestamp = nil) msg = Juggernaut::Message.new(@current_msg_id += 1, bdy, self.signature, timestamp) publish(msg) end def mark_dead(reason = "Unknown error") # Once dead, a client never recovers since a reconnection # attempt would hook onto a new em instance. A client # usually dies through an unbind @connected = false @client.remove_connection(self) if @client end def alive? @connected == true end def has_channels?(channels) channels.each {|channel| return true if has_channel?(channel) } false end def has_channel?(channel) @channels.include?(channel) end def add_channel(chan_name) return if !chan_name or chan_name == '' @channels << chan_name unless has_channel?(chan_name) end def add_channels(chan_names) chan_names.to_a.each do |chan_name| add_channel(chan_name) end end def remove_channel!(chan_name) @channels.delete(chan_name) end def remove_channels!(chan_names) chan_names.to_a.each do |chan_name| remove_channel!(chan_name) end end protected # Commands def broadcast_command raise MalformedBroadcast, @request unless @request[:type] raise UnauthorisedBroadcast, @request unless authenticate_broadcast_or_query case @request[:type].to_sym when :to_channels # if channels is a blank array, sends to everybody! broadcast_to_channels(@request[:body], @request[:channels]) when :to_clients broadcast_needs :client_ids @request[:client_ids].each do |client_id| # if channels aren't empty, scopes broadcast to clients on those channels broadcast_to_client(@request[:body], client_id, @request[:channels]) end else raise MalformedBroadcast, @request end end def query_command raise MalformedQuery, @request unless @request[:type] raise UnauthorisedQuery, @request unless authenticate_broadcast_or_query case @request[:type].to_sym when :remove_channels_from_all_clients query_needs :channels clients = Juggernaut::Client.find_all clients.each {|client| client.remove_channels!(@request[:channels]) } when :remove_channels_from_client query_needs :client_ids, :channels @request[:client_ids].each do |client_id| client = Juggernaut::Client.find_by_id(client_id) client.remove_channels!(@request[:channels]) if client end when :show_channels_for_client query_needs :client_id if client = Juggernaut::Client.find_by_id(@request[:client_id]) publish client.channels.to_json else publish nil.to_json end when :show_clients if @request[:client_ids] and @request[:client_ids].any? clients = @request[:client_ids].collect{ |client_id| Client.find_by_id(client_id) }.compact.uniq else clients = Juggernaut::Client.find_all end publish clients.to_json when :show_client query_needs :client_id publish Juggernaut::Client.find_by_id(@request[:client_id]).to_json when :show_clients_for_channels query_needs :channels publish Juggernaut::Client.find_by_channels(@request[:channels]).to_json else raise MalformedQuery, @request end end def noop_command logger.debug "NOOP" publish("NOOP") end def subscribe_command logger.debug "SUBSCRIBE: #{@request.inspect}" if channels = @request[:channels] add_channels(channels) end @client = Juggernaut::Client.find_or_create(self, @request) if !@client.subscription_request(@channels) raise UnauthorisedSubscription, @client end if options[:store_messages] @client.send_queued_messages(self) end end private # Different broadcast types def broadcast_to_channels(msg, channels = []) Juggernaut::Client.find_all.each {|client| client.send_message(msg, channels) } end def broadcast_to_client(body, client_id, channels) client = Juggernaut::Client.find_by_id(client_id) client.send_message(body, channels) if client end # Helper methods def broadcast_needs(*args) args.each do |arg| raise MalformedBroadcast, @request unless @request.has_key?(arg) end end def subscribe_needs(*args) args.each do |arg| raise MalformedSubscribe, @request unless @request.has_key?(arg) end end def query_needs(*args) args.each do |arg| raise MalformedQuery, @request unless @request.has_key?(arg) end end def authenticate_broadcast_or_query if options[:allowed_ips] return true if options[:allowed_ips].include?(client_ip) elsif !@request[:secret_key] return true if broadcast_query_request elsif options[:secret_key] return true if @request[:secret_key] == options[:secret_key] end if !options[:allowed_ips] and !options[:secret_key] and !options[:broadcast_query_login_url] return true end false end def broadcast_query_request return false unless options[:broadcast_query_login_url] url = URI.parse(options[:broadcast_query_login_url]) params = [] params << "client_id=#{@request[:client_id]}" if @request[:client_id] params << "session_id=#{URI.escape(@request[:session_id])}" if @request[:session_id] params << "type=#{@request[:type]}" params << "command=#{@request[:command]}" (@request[:channels] || []).each {|chan| params << "channels[]=#{chan}" } url.query = params.join('&') begin open(url.to_s, "User-Agent" => "Ruby/#{RUBY_VERSION}") rescue Timeout::Error return false rescue return false end true end def client_ip Socket.unpack_sockaddr_in(get_peername)[1] rescue nil end end end