lib/pubnub/client.rb in pubnub-3.4.1 vs lib/pubnub/client.rb in pubnub-3.5.1

- old
+ new

@@ -1,535 +1,256 @@ -require 'pubnub/configuration.rb' -require 'pubnub/subscription.rb' -require 'em-http-request' -require 'httparty' -require 'persistent_httparty' -require 'timeout' +require 'pubnub/configuration' +require 'pubnub/parser' +require 'pubnub/envelope' +require 'pubnub/crypto' require 'pubnub/uuid' +require 'pubnub/exceptions' +require 'pubnub/event' +require 'pubnub/formatter' +require 'pubnub/validator' +require 'pubnub/pam' module Pubnub - class PubNubHTTParty - include HTTParty - default_timeout 310 - - def first_run? - @first_run ? true : false - end - - def send_request(path, options={}, &block) - if @first_run.nil? - @first_run = true - else - @first_run = false - end - self.class.get path, options - - end - end - class Client include Configuration - attr_accessor :uuid, :cipher_key, :host, :query, :response, :timetoken, :url, :operation, :callback, :publish_key, :subscribe_key, :secret_key, :channel, :jsonp, :message, :ssl, :port - attr_accessor :close_connection, :history_limit, :history_count, :history_start, :history_end, :history_reverse, :session_uuid, :last_timetoken, :origin, :error + attr_reader :env + attr_accessor :single_event_connections_pool, :subscribe_event_connections_pool, :uuid, :async_events + EVENTS = %w(publish subscribe presence leave history here_now audit grant revoke time) + VERSION = Pubnub::VERSION - DEFAULT_CONNECT_CALLBACK = lambda { |msg| $log.info "CONNECTED: #{msg}" } - DEFAULT_ERROR_CALLBACK = lambda { |msg| $log.error "AN ERROR OCCURRED: #{msg}" } + EVENTS.each do |event_name| + require File.join('pubnub', 'events', event_name) + end - def initialize(options = {}) - $log = options[:logger] - $log = Logger.new('pubnub.log', 0, 100 * 1024 * 1024) unless $log + EVENTS.each do |event_name| + define_method event_name do |params, &block| + params[:callback] = block if params[:callback].nil? + event = Pubnub.const_get(classify_method(event_name)).new(params, self) + $logger.debug('Created event ' + event.class.to_s) + #if params[:http_sync] + event.fire(self) + #else + # EM.defer { + # begin + # event.fire(self) + # rescue => e + # puts e + # end + # } + #end + end + end + alias_method :unsubscribe, :leave - @subscriptions = Array.new + def initialize(options) + validate!(options) + setup_app(options) + # From this moment we have to use @env in that method instead of options + create_connections_pools(@env) + create_subscriptions_pools(@env) + start_event_machine(@env) + end - @subscription_request = nil - @retry = true - @retry_count = 0 - @callback = options[:callback] - @error_callback = options[:error_callback] - @error_callback = DEFAULT_ERROR_CALLBACK unless @error_callback - @connect_callback = options[:connect_callback] - @connect_callback = DEFAULT_CONNECT_CALLBACK unless @connect_callback - @cipher_key = options[:cipher_key] - @publish_key = options[:publish_key] || DEFAULT_PUBLISH_KEY - @subscribe_key = options[:subscribe_key] || DEFAULT_SUBSCRIBE_KEY - @channel = options[:channel] || DEFAULT_CHANNEL - @message = options[:message] - @ssl = options[:ssl] - @secret_key = options[:secret_key] - @timetoken = options[:timetoken] - @session_uuid = options[:uuid] || options[:session_uuid] || UUID.new.generate + def shutdown(stop_em = false) + @single_event_connections_pool.each do |origin, conn| + conn.shutdown_in_all_threads + conn = nil + end + @single_event_connections_pool = Hash.new - @history_count = options[:count] - @history_start = options[:start] - @history_end = options[:end] - @history_reverse = options[:reverse] + @subscribe_event_connections_pool.each do |origin, conn| + conn.shutdown_in_all_threads + conn = nil + end + @subscribe_event_connections_pool = Hash.new - @port = options[:port] - @url = options[:url] - @origin = options[:origin] - @origin = DEFAULT_ORIGIN unless @origin - @query = options[:query] + @env[:callback_pool] = Hash.new + @env[:subscriptions] = Hash.new - @http_sync = options[:http_sync] + EM.stop if stop_em - @max_retries = options[:max_retries] - @max_retries = MAX_RETRIES unless @max_retries - - @non_subscribe_timeout = options[:non_subscribe_timeout] - @non_subscribe_timeout = 5 unless @non_subscribe_timeout - - @reconnect_max_attempts = options[:reconnect_max_attempts] - @reconnect_max_attempts = 60 unless @reconnect_max_attempts - - @reconnect_retry_interval = options[:reconnect_retry_interval] - @reconnect_retry_interval = 5 unless @reconnect_retry_interval - - @reconnect_response_timeout = options[:reconnect_response_timeout] - @reconnect_response_timeout = 5 unless @reconnect_response_timeout - - @sync_connection_sub = Pubnub::PubNubHTTParty.new - @sync_connection = Pubnub::PubNubHTTParty.new - - @pause_subscribe = false + $logger.info('Bye!') end - def publish(options = {}, &block) - options[:callback] = block if block_given? - options = merge_options(options, 'publish') - verify_operation('publish', options) - start_request options + def start_subscribe(override = false) - end + start_event_machine - def subscribe(options = {}, &block) - options[:callback] = block if block_given? - options = merge_options(options, 'subscribe') - verify_operation('subscribe', options) - @error_callback.call 'YOU ARE ALREADY SUBSCRIBED TO THAT CHANNEL' if get_channels_for_subscription.include? options[:channel] - $log.error 'YOU ARE ALREADY SUBSCRIBED TO THAT CHANNEL' if get_channels_for_subscription.include? options[:channel] - start_request options - end - - def presence(options = {}, &block) - options[:callback] = block if block_given? - options = merge_options(options, 'presence') - verify_operation('presence', options) - start_request options - end - - def history(options = {}, &block) - options[:callback] = block if block_given? - options = merge_options(options, 'history') - verify_operation('history', options) - options[:params].merge!({:count => options[:count]}) - options[:params].merge!({:start => options[:start]}) unless options[:start].nil? - options[:params].merge!({:end => options[:end]}) unless options[:end].nil? - options[:params].merge!({:reverse => 'true'}) if options[:reverse] - start_request options - end - - def leave(options = {}, &block) - options[:callback] = block if block_given? - options = merge_options(options, 'leave') - verify_operation('leave', options) - return false unless get_channels_for_subscription.include? options[:channel] - remove_from_subscription options[:channel] - if @subscriptions.empty? - @timetoken = 0 - @subscription_request.timetoken = 0 - @subscribe_connection.close - @wait_for_response = false + if override + $logger.debug('Pubnub::Client#start_subscribe | Override') + @env[:subscribe_railgun].cancel + @env[:subscribe_railgun] = nil + @env[:wait_for_response].each do |k,v| + @env[:wait_for_response][k] = false + end end - start_request options - end + @env[:wait_for_response] = Hash.new unless @wait_for_response + @env[:subscribe_railgun] = EM.add_periodic_timer(PERIODIC_TIMER_INTERVAL) do + begin + @env[:subscriptions].each do |origin, subscribe| + unless @env[:wait_for_response][origin] + @env[:wait_for_response][origin] = true - alias_method :unsubscribe, :leave + $logger.debug('Async subscription running') + $logger.debug("origin: #{origin}") + $logger.debug("timetoken: #{@env[:timetoken]}") - def here_now(options = {}, &block) - options[:callback] = block if block_given? - options = merge_options(options, 'here_now') - verify_operation('here_now', options) - start_request options - end + EM.defer do + subscribe.start_event(self) if subscribe + # @env[:wait_for_response][origin] = false # moved to Event + end - def time(options = {}, &block) - options[:callback] = block if block_given? - options = merge_options(options, 'time') - verify_operation('time', options) - start_request options + end + end + rescue => e + $logger.error(e) + $logger.error(e.backtrace) + end + end unless @env[:subscribe_railgun] end def subscription_running? - @subscription_running + @env[:subscribe_railgun] && !@env[:subscriptions].empty? ? true : false end - def active_subscriptions - @subscription_request + def create_subscriptions_pools(env) + @env[:subscriptions] = Hash.new + @env[:callbacks_pool] = Hash.new + @env[:error_callbacks_pool] = Hash.new end - private - - def remove_from_subscription(channel) - @subscriptions.delete_if { |s| s.channel.to_s == channel.to_s } + def update_timetoken(timetoken) + @env[:timetoken] = timetoken.to_i + $logger.debug("Pubnub::Client#update_timetoken | Current timetoken is eq #{@env[:timetoken]}") end - def merge_options(options = {}, operation = '') - options[:channel] = compile_channel_parameter(options[:channel],options[:channels]) if options[:channel] || options[:channels] - return { - :ssl => @ssl, - :cipher_key => @cipher_key, - :publish_key => @publish_key, - :subscribe_key => @subscribe_key, - :secret_key => @secret_key, - :origin => @origin, - :operation => operation, - :params => { :uuid => @session_uuid }, - :timetoken => @timetoken, - :error_callback=> @error_callback - }.merge(options) + def set_uuid(uuid) + leave_all unless @env[:subscriptions].empty? + @env[:uuid] = uuid + start_subscribe(true) unless @env[:subscriptions].empty? end + alias_method :session_uuid=, :set_uuid + alias_method :uuid=, :set_uuid - def start_em_if_not_running - Thread.new do - EM.run - end unless EM.reactor_running? - - until EM.reactor_running? do end + def set_auth_key(auth_key) + leave_all unless @env[:subscriptions].empty? + @env[:auth_key] = auth_key + start_subscribe(true) unless @env[:subscriptions].empty? end + alias_method :auth_key=, :set_auth_key - def get_channels_for_subscription - @subscriptions.map do |sub| - sub.channel - end + def set_cipher_key(cipher_key) + @env[:cipher_key] = cipher_key end + alias_method :cipher_key=, :set_cipher_key - def fire_subscriptions_callback_for(envelope) - @subscriptions.each do |subscription| - subscription.fire_callback_for envelope - end - end - - def start_request(options) - request = Pubnub::Request.new(options) - unless options[:http_sync] - start_em_if_not_running - - if %w(subscribe presence).include? request.operation - options[:channel].split(',').each do |channel| - @subscriptions << Subscription.new(:channel => channel, :callback => options[:callback], :error_callback => options[:error_callback]) unless get_channels_for_subscription.include? channel - end - - @subscription_request = request unless @subscription_request - - if @subscription_request.channel != get_channels_for_subscription.join(',') && @subscription_running - @subscribe_connection.close - @timetoken = 0 - @subscription_request.timetoken = 0 - @wait_for_response = false - end - - @subscription_request.channel = get_channels_for_subscription.join(',') - - @subscription_running = EM.add_periodic_timer(PERIODIC_TIMER) do - unless @wait_for_response || get_channels_for_subscription.empty? - @wait_for_response = true - $log.debug 'SETTING CHANNELS' - @subscription_request.channel = get_channels_for_subscription.join(',') - $log.debug 'SENDING SUBSCRIBE REQUEST' - http = send_request(@subscription_request) - - http.callback do - $log.debug 'GOT SUBSCRIBE RESPONSE' - if http.response_header.status.to_i == 200 - $log.debug 'STATUS 200' - if is_valid_json?(http.response) - $log.debug 'GOT VALID JSON' - @subscription_request.handle_response(http) - $log.debug 'HANDLED RESPONSE' - if is_update?(@subscription_request.timetoken) - $log.debug 'TIMETOKEN UPDATED' - @subscription_request.envelopes.each do |envelope| - fire_subscriptions_callback_for envelope - end - else - $log.debug 'TIMETOKEN NOT UPDATED' - end - end - else - if request.error_callback - request.error_callback.call Pubnub::Response.new( - :error_init => true, - :message => [0, "Bad server response: #{http.response_header.status.to_i}"].to_json, - :response => [0, "Bad server response: #{http.response_header.status.to_i}"].to_json - ) - else - fire_subscriptions_callback_for Pubnub::Response.new( - :error_init => true, - :message => [0, "Bad server response: #{http.response_header.status.to_i}"].to_json, - :response => [0, "Bad server response: #{http.response_header.status.to_i}"].to_json - ) - end - - end - - @wait_for_response = false - end - - http.errback do - $log.error 'GOT SUBSCRIBE ERROR' - @error_callback.call Pubnub::Response.new( - :error_init => true, - :message => [0, http.error].to_json, - :response => [0, http.error].to_json - ) - $log.error 'CALLED ERROR CALLBACK' - - @wait_for_response = false - end - end - end unless @subscription_running - else - EM.next_tick do - $log.debug 'SENDING OTHER REQUEST' - - http = send_request(request) - - http.errback do - @error_callback.call [0, http.error] - end - - http.callback do - $log.debug 'GOT OTHER RESPONSE' - #byebug - if http.response_header.status.to_i == 200 - if is_valid_json?(http.response) - request.handle_response(http) - request.envelopes.each do |envelope| - $log.debug 'CALLING PARAMETER CALLBACK' - request.callback.call envelope - end - end - else - begin - request.handle_response(http) - request.envelopes.each do |envelope| - if request.error_callback - request.error_callback.call envelope - else - @error_callback.call envelope - end - end - - rescue - if request.error_callback - request.error_callback.call Pubnub::Response.new( - :error_init => true, - :message => [0, "Bad server response: #{http.response_header.status.to_i}"].to_json, - :response => [0, "Bad server response: #{http.response_header.status.to_i}"].to_json - ) - else - @error_callback.call Pubnub::Response.new( - :error_init => true, - :message => [0, "Bad server response: #{http.response_header.status.to_i}"].to_json, - :response => [0, "Bad server response: #{http.response_header.status.to_i}"].to_json - ) - end - end - end - end - end - end + def start_railgun + if @env[:railgun] + $logger.debug('Pubnub::Client#start_railgun | Railgun already initialized') else - begin - if @timetoken.to_i == 0 && request.operation == 'subscribe' - time(:http_sync => true){|envelope| @timetoken = envelope.message.to_i } - end - begin - if request.query.to_s.empty? - if %w(subscribe presence).include? request.operation - response = @sync_connection_sub.send_request(request.origin + request.path, :timeout => 370) - else - response = @sync_connection.send_request(request.origin + request.path, :timeout => @non_subscribe_timeout) - end - else - if %w(subscribe presence).include? request.operation - response = @sync_connection_sub.send_request(request.origin + request.path, :query => request.query, :timeout => 370) - else - response = @sync_connection.send_request(request.origin + request.path, :query => request.query, :timeout => @non_subscribe_timeout) - end + $logger.debug('Pubnub::Client#start_railgun | Initializing railgun') + @env[:railgun] = EM.add_periodic_timer(0.01) do + @async_events.each do |event| + EM.defer do + event.fire(self) unless event.fired end - rescue - msg = 'ERROR SENDING REQUEST' - @error_callback.call Pubnub::Response.new( - :error_init => true, - :message => [0, msg].to_s, - :response => [0, msg].to_s - ) - @retries = 0 unless @retries - @retries += 1 - if @retries <= @max_retries - return start_request options - else - msg = "ERROR SENDING REQUEST AFTER #{@retries} RETRIES" - @retries = 0 - return Pubnub::Response.new( - :error_init => true, - :message => [0, msg].to_s, - :response => [0, msg].to_s - ) - end end - - if @sync_connection_sub.first_run? - @connect_callback.call 'SYNC CONNECTION ESTABLISHED' - end - if response.response.code.to_i == 200 - if is_valid_json?(response.body) - request.handle_response(response) - @timetoken = request.timetoken - - if request.operation == 'leave' - Subscription.remove_from_subscription request.channel - end - - if !request.callback.nil? - request.envelopes.each do |envelope| - request.callback.call envelope - end - else - if %w(publish leave here_now time).include? request.operation - return request.envelopes[0] - else - return request.envelopes - end - end - end - else - begin - request.handle_response(response) - if !request.callback.nil? - request.envelopes.each do |envelope| - request.callback.call envelope - end - else - if %w(publish leave here_now time).include? request.operation - return request.envelopes[0] - else - return request.envelopes - end - end - rescue - if request.error_callback - request.error_callback.call Pubnub::Response.new( - :error_init => true, - :message => [0, "Bad server response: #{response.response.code.to_i}"].to_json, - :response => [0, "Bad server response: #{response.response.code.to_i}"].to_json - ) - else - @error_callback.call Pubnub::Response.new( - :error_init => true, - :message => [0, "Bad server response: #{response.response.code.to_i}"].to_json, - :response => [0, "Bad server response: #{response.response.code.to_i}"].to_json - ) - end - end - - if @sync_retries - @sync_retries += 1 - else - @sync_retries = 1 - end - - if @sync_retries < @max_retries - start_request options - end - end - rescue Timeout::Error - if request.error_callback - request.error_callback.call [0, 'TIMEOUT'] - else - @error_callback.call [0, 'TIMEOUT'] - end + @async_events.delete_if {|event| event.finished } end end end - def send_request(request) - if %w(subscribe presence).include? request.operation - unless @subscribe_connection - @subscribe_connection = EM::HttpRequest.new(request.origin, :connect_timeout => 370, :inactivity_timeout => 370) - connection = @subscribe_connection.get :path => '/time/0', :keepalive => true, :query => request.query - connection.callback do - @connect_callback.call 'ASYNC SUBSCRIBE CONNECTION' - end - end + private - @subscribe_connection.get :path => request.path, :query => request.query, :keepalive => true - else - unless @connection - @connection = EM::HttpRequest.new request.origin - end - @connection.get :path => request.path, :query => request.query, :keepalive => true + def leave_all + @env[:subscriptions].each do |origin, subscribe| + leave( + :origin => origin, + :channel => subscribe.get_channels.map{|c| c.to_s}.join(','), + :http_sync => true, + :skip_remove => true + ) + + subscribe.set_timetoken(0) end end - def is_update?(timetoken) - if @timetoken.to_i < timetoken.to_i - @timetoken = timetoken + def start_event_machine(options = nil) + $logger.debug 'Pubnub::Client#start_event_machine | starting EM in new thread' + if defined?(Thin) + $logger.debug('Pubnub::Client#start_event_machine | We\'re running on thin') else - false + $logger.debug('Pubnub::Client#start_event_machine | We aren\'t running on thin') end + if EM.reactor_running? + $logger.debug 'Pubnub::Client#start_event_machine | EM already running' + else + Thread.new { EM.run {} } + $logger.debug 'Pubnub::Client#start_event_machine | EM started in new thread' + end + end + def setup_app(options) + $logger = options[:logger] || Logger.new('pubnub.log') + @env = symbolize_options_keys(options) + @env = set_default_values(@env) + @async_events = Array.new + $logger.debug("\n\nCreated new Pubnub::Client instance") end - def is_valid_json?(response) - begin - JSON.parse(response) - valid = true - rescue - valid = false - end - valid + def create_connections_pools(options) + @subscribe_event_connections_pool = Hash.new + @single_event_connections_pool = Hash.new end - def verify_operation(operation, options) - case operation - when 'publish' - raise(ArgumentError, 'publish() requires :channel, :message parameters and, if async, callback parameter or block given.') unless (options[:channel] || options[:channels]) && (options[:callback] || options[:block_given] || options[:http_sync]) && options[:message] - when 'subscribe' - raise(ArgumentError, 'subscribe() requires :channel parameters and, if async, callback parameter or block given.') unless (options[:channel] || options[:channels]) && (options[:callback] || options[:block_given] || options[:http_sync]) - when 'presence' - raise(ArgumentError, 'presence() requires :channel parameters and, if async, callback parameter or block given.') unless (options[:channel] || options[:channels]) && (options[:callback] || options[:block_given] || options[:http_sync]) - when 'time' - raise(ArgumentError, 'time() require, if async, callback parameter or block given.') unless (options[:callback] || options[:block_given] || options[:http_sync]) - when 'history' - raise(ArgumentError, 'history() requires :channel, :count parameters and, if async, callback parameter or block given.') unless (options[:channel] || options[:channels]) && (options[:callback] || options[:block_given] || options[:http_sync]) && options[:count] - when 'here_now' - raise(ArgumentError, 'here_now() requires :channel parameters and, if async, callback parameter or block given.') unless (options[:channel] || options[:channels]) && (options[:callback] || options[:block_given] || options[:http_sync]) - when 'leave' - raise(ArgumentError, 'leave() requires :channel parameters and, if async, callback parameter or block given.') unless (options[:channel] || options[:channels]) && (options[:callback] || options[:block_given] || options[:http_sync]) - end + def set_default_values(env) + defaults = { + :error_callback => DEFAULT_ERROR_CALLBACK, + :connect_callback => DEFAULT_CONNECT_CALLBACK, + :ssl => DEFAULT_SSL, + :timetoken => DEFAULT_TIMETOKEN, + :uuid => UUID.new.generate, + :port => DEFAULT_CONNECTION_PORT, + :origin => DEFAULT_ORIGIN, + :subscribe_timeout => DEFAULT_SUBSCRIBE_TIMEOUT, + :timeout => DEFAULT_NON_SUBSCRIBE_TIMEOUT, + :max_retries => MAX_RETRIES, + :non_subscribe_timeout => DEFAULT_NON_SUBSCRIBE_TIMEOUT, + :reconnect_max_attempts => DEFAULT_RECONNECT_ATTEMPTS, + :reconnect_retry_interval => DEFAULT_RECONNECT_INTERVAL, + :reconnect_response_timeout => DEFAULT_RECONNECT_RESPONSE_TIMEOUT, + :ttl => DEFAULT_TTL, + :secret_key => 0 + } - unless options[:callback].nil? - raise('callback is invalid.') unless options[:callback].respond_to? 'call' + # Let's fill missing keys with default values + $logger.debug('Setting default values') + defaults.each do |key,default_value| + env[key] = default_value if @env[key].nil? end - unless options[:error_callback].nil? - raise('error_callback is invalid.') unless options[:error_callback].respond_to? 'call' - end + env + end + def symbolize_options_keys(options) + $logger.debug('Symbolizing options keys') + symbolized_options = {} + options.each_key { |k| symbolized_options.merge!({ k.to_sym => options[k] }) } + symbolized_options end - def compile_channel_parameter(channel, channels) - raise(ArgumentError, 'Can\'t handle both :channel and :channels parameters given.') if channel && channels - channel = channels if channels - channel = channel.to_s if channel.class == Symbol - channel = channel.map! {|c| c.to_s}.join(',') if channel.class == Array - return channel + def classify_method(method) + method.split('_').map{ |w| w.capitalize }.join end + def validate!(parameters) + raise InitializationError.new(:object => self, :message => 'Origin parameter is not valid. Should be type of String') unless parameters[:origin].is_a?(String) || parameters[:origin].blank? + raise InitializationError.new(:object => self, :message => 'Missing required :subscribe_key parameter') unless parameters[:subscribe_key] + raise InitializationError.new(:object => self, :message => 'Subscribe key parameter is not valid. Should be type of String or Symbol') unless [String, Symbol].include?(parameters[:subscribe_key].class) + raise InitializationError.new(:object => self, :message => 'Publish key parameter is not valid. Should be type of String or Symbol') unless [String, Symbol].include?(parameters[:publish_key].class) || parameters[:publish_key].blank? + end + end end -