require 'eventmachine' require 'socket' require 'json' require 'open-uri' require 'fileutils' require 'digest/sha1' module Juggernaut class Server < EventMachine::Connection include Juggernaut::Miscel class InvalidRequest < Juggernaut::JuggernautError #:nodoc: end class InvalidCommand < Juggernaut::JuggernautError #:nodoc: end class CorruptJSON < Juggernaut::JuggernautError #:nodoc: end class MalformedBroadcast < Juggernaut::JuggernautError #:nodoc: end class MalformedSubscribe < Juggernaut::JuggernautError #:nodoc: end class UnauthorisedSubscription < Juggernaut::JuggernautError #:nodoc: end class MalformedQuery < Juggernaut::JuggernautError #:nodoc: end class UnauthorisedBroadcast < Juggernaut::JuggernautError #:nodoc: end class UnauthorisedQuery < Juggernaut::JuggernautError #:nodoc: end POLICY_FILE = <<-EOF EOF POLICY_REQUEST = "" CR = "\0" attr_reader :current_msg_id attr_reader :messages attr_reader :connected attr_reader :logout_timeout attr_reader :status attr_reader :channels # EM methods def post_init logger.debug "New client [#{client_ip}]" @channels = [] @messages = [] @current_msg_id = 0 @connected = true @logout_timeout = nil @buffer = '' end # Juggernaut packets are terminated with "\0" # so we need to buffer the data until we find the # terminating "\0" def receive_data(data) logger.debug "Receiving data: #{data}" @buffer << data @buffer = process_whole_messages(@buffer) end # process any whole messages in the buffer, # and return the new contents of the buffer def process_whole_messages(data) return data if data !~ /\0/ # only process if data contains a \0 char messages = data.split("\0") if data =~ /\0$/ data = '' else # remove the last message from the list (because it is incomplete) before processing data = messages.pop end messages.each {|message| process_message(message.strip)} return data end def process_message(ln) logger.debug "Processing message: #{ln}" @request = nil if ln == POLICY_REQUEST logger.debug "Sending crossdomain file" send_data POLICY_FILE.gsub('PORT', (options[:public_port]||options[:port]).to_s) close_connection_after_writing return end begin @request = JSON.parse(ln) unless ln.empty? rescue raise CorruptJSON, ln end raise InvalidRequest, ln if !@request @request.symbolize_keys! # For debugging @request[:ip] = client_ip @request[:channels] = (@request[:channels] || []).compact.select {|c| !!c && c != '' }.uniq if @request[:client_ids] @request[:client_ids] = @request[:client_ids].to_a.compact.select {|c| !!c && c != '' }.uniq end case @request[:command].to_sym when :broadcast: broadcast_command when :subscribe: subscribe_command when :query: query_command else raise InvalidCommand, @request end rescue JuggernautError => e logger.error("#{e} - #{e.message.inspect}") close_connection # So as to stop em quitting rescue => e logger ? logger.error(e) : puts(e) end def unbind @client.logout_connection_request(@channels) if @client # todo - should be called after timeout? logger.debug "Lost client: #{@client.id}" if @client mark_dead('Unbind called') end # As far as I'm aware, send_data # never throws an exception def publish(msg) logger.debug "Sending msg: #{msg.to_s}" logger.debug "To client: #{@client.id}" if @client send_data(msg.to_s + CR) end # Connection methods def broadcast(bdy) msg = Juggernaut::Message.new(@current_msg_id += 1, bdy, self.signature) @messages << msg if options[:store_messages] publish(msg) end def mark_dead(reason = "Unknown error") # Once dead, a client never recovers since a reconnection # attempt would hook onto a new em instance. A client # usually dies through an unbind @connected = false @logout_timeout = Time::now + (options[:timeout] || 30) @status = "DEAD: %s: Could potentially logout at %s" % [ reason, @logout_timeout ] end def alive? @connected == true end def has_channels?(channels) channels.each {|channel| return true if has_channel?(channel) } false end def has_channel?(channel) @channels.include?(channel) end def add_channel(chan_name) return if !chan_name or chan_name == '' @channels << chan_name unless has_channel?(chan_name) end def add_channels(chan_names) chan_names.to_a.each do |chan_name| add_channel(chan_name) end end def remove_channel!(chan_name) @channels.delete(chan_name) end def remove_channels!(chan_names) chan_names.to_a.each do |chan_name| remove_channel!(chan_name) end end def broadcast_all_messages_from(msg_id, signature_id) return unless msg_id or signature_id client = Juggernaut::Client.find_by_signature(signature) return if !client msg_id = Integer(msg_id) return if msg_id >= client.current_msg_id client.messages.select {|msg| (msg_id..client.current_msg_id).include?(msg.id) }.each {|msg| publish(msg) } end # todo - how should this be called - if at all? def clean_up_old_messages(how_many_to_keep = 1000) while @messages.length > how_many_to_keep # We need to shift, as we want to remove the oldest first @messages.shift end end protected # Commands def broadcast_command raise MalformedBroadcast, @request unless @request[:type] raise UnauthorisedBroadcast, @request unless authenticate_broadcast_or_query case @request[:type].to_sym when :to_channels # if channels is a blank array, sends to everybody! broadcast_to_channels(@request[:body], @request[:channels]) when :to_clients broadcast_needs :client_ids @request[:client_ids].each do |client_id| # if channels aren't empty, scopes broadcast to clients on those channels broadcast_to_client(@request[:body], client_id, @request[:channels]) end else raise MalformedBroadcast, @request end end def query_command raise MalformedQuery, @request unless @request[:type] raise UnauthorisedQuery, @request unless authenticate_broadcast_or_query case @request[:type].to_sym when :remove_channels_from_all_clients query_needs :channels clients = Juggernaut::Client.find_all clients.each {|client| client.remove_channels!(@request[:channels]) } when :remove_channels_from_client query_needs :client_ids, :channels @request[:client_ids].each do |client_id| client = Juggernaut::Client.find_by_id(client_id) client.remove_channels!(@request[:channels]) if client end when :show_users if @request[:client_ids] and @request[:client_ids].any? clients = @request[:client_ids].collect{ |client_id| Client.find_by_id(client_id) }.compact.uniq else clients = Juggernaut::Client.find_all end publish clients.to_json when :show_user query_needs :client_id publish Juggernaut::Client.find_by_id(@request[:client_id]).to_json when :show_users_for_channel query_needs :channels publish Juggernaut::Client.find_by_channels(@request[:channels]).to_json else raise MalformedQuery, @request end end def subscribe_command if channels = @request[:channels] add_channels(channels) end @client = Juggernaut::Client.find_or_create(self, @request) if !@client.subscription_request(@channels) raise UnauthorisedSubscription, @client end Juggernaut::Client.add_client(@client) if options[:store_messages] broadcast_all_messages_from(@request[:last_msg_id], @request[:signature]) end end private # Different broadcast types def broadcast_to_channels(msg, channels = []) Juggernaut::Client.find_all.each {|client| client.send_message(msg, channels) } end def broadcast_to_client(body, client_id, channels) client = Juggernaut::Client.find_by_id(client_id) client.send_message(body, channels) if client end # Helper methods def broadcast_needs(*args) args.each do |arg| raise MalformedBroadcast, @request unless @request.has_key?(arg) end end def subscribe_needs(*args) args.each do |arg| raise MalformedSubscribe, @request unless @request.has_key?(arg) end end def query_needs(*args) args.each do |arg| raise MalformedQuery, @request unless @request.has_key?(arg) end end def authenticate_broadcast_or_query if options[:allowed_ips] return true if options[:allowed_ips].include?(client_ip) elsif !@request[:secret_key] return true if broadcast_query_request elsif options[:secret_key] return true if @request[:secret_key] == @options[:secret_key] end if !options[:allowed_ips] and !options[:secret_key] and !options[:broadcast_query_login_url] return true end false end def broadcast_query_request return false unless options[:broadcast_query_login_url] url = URI.parse(options[:broadcast_query_login_url]) params = [] params << "client_id=#{@request[:client_id]}" if @request[:client_id] params << "session_id=#{@request[:session_id]}" if @request[:session_id] params << "type=#{@request[:type]}" params << "command=#{@request[:command]}" (@request[:channels] || []).each {|chan| params << "channels[]=#{chan}" } url.query = params.join('&') begin open(url.to_s, "User-Agent" => "Ruby/#{RUBY_VERSION}") rescue return false end true end def client_ip Socket.unpack_sockaddr_in(get_peername)[1] end end end