lib/pubnub/client.rb in pubnub-3.4.1 vs lib/pubnub/client.rb in pubnub-3.5.1
- old
+ new
@@ -1,535 +1,256 @@
-require 'pubnub/configuration.rb'
-require 'pubnub/subscription.rb'
-require 'em-http-request'
-require 'httparty'
-require 'persistent_httparty'
-require 'timeout'
+require 'pubnub/configuration'
+require 'pubnub/parser'
+require 'pubnub/envelope'
+require 'pubnub/crypto'
require 'pubnub/uuid'
+require 'pubnub/exceptions'
+require 'pubnub/event'
+require 'pubnub/formatter'
+require 'pubnub/validator'
+require 'pubnub/pam'
module Pubnub
- class PubNubHTTParty
- include HTTParty
- default_timeout 310
-
- def first_run?
- @first_run ? true : false
- end
-
- def send_request(path, options={}, &block)
- if @first_run.nil?
- @first_run = true
- else
- @first_run = false
- end
- self.class.get path, options
-
- end
- end
-
class Client
include Configuration
- attr_accessor :uuid, :cipher_key, :host, :query, :response, :timetoken, :url, :operation, :callback, :publish_key, :subscribe_key, :secret_key, :channel, :jsonp, :message, :ssl, :port
- attr_accessor :close_connection, :history_limit, :history_count, :history_start, :history_end, :history_reverse, :session_uuid, :last_timetoken, :origin, :error
+ attr_reader :env
+ attr_accessor :single_event_connections_pool, :subscribe_event_connections_pool, :uuid, :async_events
+ EVENTS = %w(publish subscribe presence leave history here_now audit grant revoke time)
+ VERSION = Pubnub::VERSION
- DEFAULT_CONNECT_CALLBACK = lambda { |msg| $log.info "CONNECTED: #{msg}" }
- DEFAULT_ERROR_CALLBACK = lambda { |msg| $log.error "AN ERROR OCCURRED: #{msg}" }
+ EVENTS.each do |event_name|
+ require File.join('pubnub', 'events', event_name)
+ end
- def initialize(options = {})
- $log = options[:logger]
- $log = Logger.new('pubnub.log', 0, 100 * 1024 * 1024) unless $log
+ EVENTS.each do |event_name|
+ define_method event_name do |params, &block|
+ params[:callback] = block if params[:callback].nil?
+ event = Pubnub.const_get(classify_method(event_name)).new(params, self)
+ $logger.debug('Created event ' + event.class.to_s)
+ #if params[:http_sync]
+ event.fire(self)
+ #else
+ # EM.defer {
+ # begin
+ # event.fire(self)
+ # rescue => e
+ # puts e
+ # end
+ # }
+ #end
+ end
+ end
+ alias_method :unsubscribe, :leave
- @subscriptions = Array.new
+ def initialize(options)
+ validate!(options)
+ setup_app(options)
+ # From this moment we have to use @env in that method instead of options
+ create_connections_pools(@env)
+ create_subscriptions_pools(@env)
+ start_event_machine(@env)
+ end
- @subscription_request = nil
- @retry = true
- @retry_count = 0
- @callback = options[:callback]
- @error_callback = options[:error_callback]
- @error_callback = DEFAULT_ERROR_CALLBACK unless @error_callback
- @connect_callback = options[:connect_callback]
- @connect_callback = DEFAULT_CONNECT_CALLBACK unless @connect_callback
- @cipher_key = options[:cipher_key]
- @publish_key = options[:publish_key] || DEFAULT_PUBLISH_KEY
- @subscribe_key = options[:subscribe_key] || DEFAULT_SUBSCRIBE_KEY
- @channel = options[:channel] || DEFAULT_CHANNEL
- @message = options[:message]
- @ssl = options[:ssl]
- @secret_key = options[:secret_key]
- @timetoken = options[:timetoken]
- @session_uuid = options[:uuid] || options[:session_uuid] || UUID.new.generate
+ def shutdown(stop_em = false)
+ @single_event_connections_pool.each do |origin, conn|
+ conn.shutdown_in_all_threads
+ conn = nil
+ end
+ @single_event_connections_pool = Hash.new
- @history_count = options[:count]
- @history_start = options[:start]
- @history_end = options[:end]
- @history_reverse = options[:reverse]
+ @subscribe_event_connections_pool.each do |origin, conn|
+ conn.shutdown_in_all_threads
+ conn = nil
+ end
+ @subscribe_event_connections_pool = Hash.new
- @port = options[:port]
- @url = options[:url]
- @origin = options[:origin]
- @origin = DEFAULT_ORIGIN unless @origin
- @query = options[:query]
+ @env[:callback_pool] = Hash.new
+ @env[:subscriptions] = Hash.new
- @http_sync = options[:http_sync]
+ EM.stop if stop_em
- @max_retries = options[:max_retries]
- @max_retries = MAX_RETRIES unless @max_retries
-
- @non_subscribe_timeout = options[:non_subscribe_timeout]
- @non_subscribe_timeout = 5 unless @non_subscribe_timeout
-
- @reconnect_max_attempts = options[:reconnect_max_attempts]
- @reconnect_max_attempts = 60 unless @reconnect_max_attempts
-
- @reconnect_retry_interval = options[:reconnect_retry_interval]
- @reconnect_retry_interval = 5 unless @reconnect_retry_interval
-
- @reconnect_response_timeout = options[:reconnect_response_timeout]
- @reconnect_response_timeout = 5 unless @reconnect_response_timeout
-
- @sync_connection_sub = Pubnub::PubNubHTTParty.new
- @sync_connection = Pubnub::PubNubHTTParty.new
-
- @pause_subscribe = false
+ $logger.info('Bye!')
end
- def publish(options = {}, &block)
- options[:callback] = block if block_given?
- options = merge_options(options, 'publish')
- verify_operation('publish', options)
- start_request options
+ def start_subscribe(override = false)
- end
+ start_event_machine
- def subscribe(options = {}, &block)
- options[:callback] = block if block_given?
- options = merge_options(options, 'subscribe')
- verify_operation('subscribe', options)
- @error_callback.call 'YOU ARE ALREADY SUBSCRIBED TO THAT CHANNEL' if get_channels_for_subscription.include? options[:channel]
- $log.error 'YOU ARE ALREADY SUBSCRIBED TO THAT CHANNEL' if get_channels_for_subscription.include? options[:channel]
- start_request options
- end
-
- def presence(options = {}, &block)
- options[:callback] = block if block_given?
- options = merge_options(options, 'presence')
- verify_operation('presence', options)
- start_request options
- end
-
- def history(options = {}, &block)
- options[:callback] = block if block_given?
- options = merge_options(options, 'history')
- verify_operation('history', options)
- options[:params].merge!({:count => options[:count]})
- options[:params].merge!({:start => options[:start]}) unless options[:start].nil?
- options[:params].merge!({:end => options[:end]}) unless options[:end].nil?
- options[:params].merge!({:reverse => 'true'}) if options[:reverse]
- start_request options
- end
-
- def leave(options = {}, &block)
- options[:callback] = block if block_given?
- options = merge_options(options, 'leave')
- verify_operation('leave', options)
- return false unless get_channels_for_subscription.include? options[:channel]
- remove_from_subscription options[:channel]
- if @subscriptions.empty?
- @timetoken = 0
- @subscription_request.timetoken = 0
- @subscribe_connection.close
- @wait_for_response = false
+ if override
+ $logger.debug('Pubnub::Client#start_subscribe | Override')
+ @env[:subscribe_railgun].cancel
+ @env[:subscribe_railgun] = nil
+ @env[:wait_for_response].each do |k,v|
+ @env[:wait_for_response][k] = false
+ end
end
- start_request options
- end
+ @env[:wait_for_response] = Hash.new unless @wait_for_response
+ @env[:subscribe_railgun] = EM.add_periodic_timer(PERIODIC_TIMER_INTERVAL) do
+ begin
+ @env[:subscriptions].each do |origin, subscribe|
+ unless @env[:wait_for_response][origin]
+ @env[:wait_for_response][origin] = true
- alias_method :unsubscribe, :leave
+ $logger.debug('Async subscription running')
+ $logger.debug("origin: #{origin}")
+ $logger.debug("timetoken: #{@env[:timetoken]}")
- def here_now(options = {}, &block)
- options[:callback] = block if block_given?
- options = merge_options(options, 'here_now')
- verify_operation('here_now', options)
- start_request options
- end
+ EM.defer do
+ subscribe.start_event(self) if subscribe
+ # @env[:wait_for_response][origin] = false # moved to Event
+ end
- def time(options = {}, &block)
- options[:callback] = block if block_given?
- options = merge_options(options, 'time')
- verify_operation('time', options)
- start_request options
+ end
+ end
+ rescue => e
+ $logger.error(e)
+ $logger.error(e.backtrace)
+ end
+ end unless @env[:subscribe_railgun]
end
def subscription_running?
- @subscription_running
+ @env[:subscribe_railgun] && !@env[:subscriptions].empty? ? true : false
end
- def active_subscriptions
- @subscription_request
+ def create_subscriptions_pools(env)
+ @env[:subscriptions] = Hash.new
+ @env[:callbacks_pool] = Hash.new
+ @env[:error_callbacks_pool] = Hash.new
end
- private
-
- def remove_from_subscription(channel)
- @subscriptions.delete_if { |s| s.channel.to_s == channel.to_s }
+ def update_timetoken(timetoken)
+ @env[:timetoken] = timetoken.to_i
+ $logger.debug("Pubnub::Client#update_timetoken | Current timetoken is eq #{@env[:timetoken]}")
end
- def merge_options(options = {}, operation = '')
- options[:channel] = compile_channel_parameter(options[:channel],options[:channels]) if options[:channel] || options[:channels]
- return {
- :ssl => @ssl,
- :cipher_key => @cipher_key,
- :publish_key => @publish_key,
- :subscribe_key => @subscribe_key,
- :secret_key => @secret_key,
- :origin => @origin,
- :operation => operation,
- :params => { :uuid => @session_uuid },
- :timetoken => @timetoken,
- :error_callback=> @error_callback
- }.merge(options)
+ def set_uuid(uuid)
+ leave_all unless @env[:subscriptions].empty?
+ @env[:uuid] = uuid
+ start_subscribe(true) unless @env[:subscriptions].empty?
end
+ alias_method :session_uuid=, :set_uuid
+ alias_method :uuid=, :set_uuid
- def start_em_if_not_running
- Thread.new do
- EM.run
- end unless EM.reactor_running?
-
- until EM.reactor_running? do end
+ def set_auth_key(auth_key)
+ leave_all unless @env[:subscriptions].empty?
+ @env[:auth_key] = auth_key
+ start_subscribe(true) unless @env[:subscriptions].empty?
end
+ alias_method :auth_key=, :set_auth_key
- def get_channels_for_subscription
- @subscriptions.map do |sub|
- sub.channel
- end
+ def set_cipher_key(cipher_key)
+ @env[:cipher_key] = cipher_key
end
+ alias_method :cipher_key=, :set_cipher_key
- def fire_subscriptions_callback_for(envelope)
- @subscriptions.each do |subscription|
- subscription.fire_callback_for envelope
- end
- end
-
- def start_request(options)
- request = Pubnub::Request.new(options)
- unless options[:http_sync]
- start_em_if_not_running
-
- if %w(subscribe presence).include? request.operation
- options[:channel].split(',').each do |channel|
- @subscriptions << Subscription.new(:channel => channel, :callback => options[:callback], :error_callback => options[:error_callback]) unless get_channels_for_subscription.include? channel
- end
-
- @subscription_request = request unless @subscription_request
-
- if @subscription_request.channel != get_channels_for_subscription.join(',') && @subscription_running
- @subscribe_connection.close
- @timetoken = 0
- @subscription_request.timetoken = 0
- @wait_for_response = false
- end
-
- @subscription_request.channel = get_channels_for_subscription.join(',')
-
- @subscription_running = EM.add_periodic_timer(PERIODIC_TIMER) do
- unless @wait_for_response || get_channels_for_subscription.empty?
- @wait_for_response = true
- $log.debug 'SETTING CHANNELS'
- @subscription_request.channel = get_channels_for_subscription.join(',')
- $log.debug 'SENDING SUBSCRIBE REQUEST'
- http = send_request(@subscription_request)
-
- http.callback do
- $log.debug 'GOT SUBSCRIBE RESPONSE'
- if http.response_header.status.to_i == 200
- $log.debug 'STATUS 200'
- if is_valid_json?(http.response)
- $log.debug 'GOT VALID JSON'
- @subscription_request.handle_response(http)
- $log.debug 'HANDLED RESPONSE'
- if is_update?(@subscription_request.timetoken)
- $log.debug 'TIMETOKEN UPDATED'
- @subscription_request.envelopes.each do |envelope|
- fire_subscriptions_callback_for envelope
- end
- else
- $log.debug 'TIMETOKEN NOT UPDATED'
- end
- end
- else
- if request.error_callback
- request.error_callback.call Pubnub::Response.new(
- :error_init => true,
- :message => [0, "Bad server response: #{http.response_header.status.to_i}"].to_json,
- :response => [0, "Bad server response: #{http.response_header.status.to_i}"].to_json
- )
- else
- fire_subscriptions_callback_for Pubnub::Response.new(
- :error_init => true,
- :message => [0, "Bad server response: #{http.response_header.status.to_i}"].to_json,
- :response => [0, "Bad server response: #{http.response_header.status.to_i}"].to_json
- )
- end
-
- end
-
- @wait_for_response = false
- end
-
- http.errback do
- $log.error 'GOT SUBSCRIBE ERROR'
- @error_callback.call Pubnub::Response.new(
- :error_init => true,
- :message => [0, http.error].to_json,
- :response => [0, http.error].to_json
- )
- $log.error 'CALLED ERROR CALLBACK'
-
- @wait_for_response = false
- end
- end
- end unless @subscription_running
- else
- EM.next_tick do
- $log.debug 'SENDING OTHER REQUEST'
-
- http = send_request(request)
-
- http.errback do
- @error_callback.call [0, http.error]
- end
-
- http.callback do
- $log.debug 'GOT OTHER RESPONSE'
- #byebug
- if http.response_header.status.to_i == 200
- if is_valid_json?(http.response)
- request.handle_response(http)
- request.envelopes.each do |envelope|
- $log.debug 'CALLING PARAMETER CALLBACK'
- request.callback.call envelope
- end
- end
- else
- begin
- request.handle_response(http)
- request.envelopes.each do |envelope|
- if request.error_callback
- request.error_callback.call envelope
- else
- @error_callback.call envelope
- end
- end
-
- rescue
- if request.error_callback
- request.error_callback.call Pubnub::Response.new(
- :error_init => true,
- :message => [0, "Bad server response: #{http.response_header.status.to_i}"].to_json,
- :response => [0, "Bad server response: #{http.response_header.status.to_i}"].to_json
- )
- else
- @error_callback.call Pubnub::Response.new(
- :error_init => true,
- :message => [0, "Bad server response: #{http.response_header.status.to_i}"].to_json,
- :response => [0, "Bad server response: #{http.response_header.status.to_i}"].to_json
- )
- end
- end
- end
- end
- end
- end
+ def start_railgun
+ if @env[:railgun]
+ $logger.debug('Pubnub::Client#start_railgun | Railgun already initialized')
else
- begin
- if @timetoken.to_i == 0 && request.operation == 'subscribe'
- time(:http_sync => true){|envelope| @timetoken = envelope.message.to_i }
- end
- begin
- if request.query.to_s.empty?
- if %w(subscribe presence).include? request.operation
- response = @sync_connection_sub.send_request(request.origin + request.path, :timeout => 370)
- else
- response = @sync_connection.send_request(request.origin + request.path, :timeout => @non_subscribe_timeout)
- end
- else
- if %w(subscribe presence).include? request.operation
- response = @sync_connection_sub.send_request(request.origin + request.path, :query => request.query, :timeout => 370)
- else
- response = @sync_connection.send_request(request.origin + request.path, :query => request.query, :timeout => @non_subscribe_timeout)
- end
+ $logger.debug('Pubnub::Client#start_railgun | Initializing railgun')
+ @env[:railgun] = EM.add_periodic_timer(0.01) do
+ @async_events.each do |event|
+ EM.defer do
+ event.fire(self) unless event.fired
end
- rescue
- msg = 'ERROR SENDING REQUEST'
- @error_callback.call Pubnub::Response.new(
- :error_init => true,
- :message => [0, msg].to_s,
- :response => [0, msg].to_s
- )
- @retries = 0 unless @retries
- @retries += 1
- if @retries <= @max_retries
- return start_request options
- else
- msg = "ERROR SENDING REQUEST AFTER #{@retries} RETRIES"
- @retries = 0
- return Pubnub::Response.new(
- :error_init => true,
- :message => [0, msg].to_s,
- :response => [0, msg].to_s
- )
- end
end
-
- if @sync_connection_sub.first_run?
- @connect_callback.call 'SYNC CONNECTION ESTABLISHED'
- end
- if response.response.code.to_i == 200
- if is_valid_json?(response.body)
- request.handle_response(response)
- @timetoken = request.timetoken
-
- if request.operation == 'leave'
- Subscription.remove_from_subscription request.channel
- end
-
- if !request.callback.nil?
- request.envelopes.each do |envelope|
- request.callback.call envelope
- end
- else
- if %w(publish leave here_now time).include? request.operation
- return request.envelopes[0]
- else
- return request.envelopes
- end
- end
- end
- else
- begin
- request.handle_response(response)
- if !request.callback.nil?
- request.envelopes.each do |envelope|
- request.callback.call envelope
- end
- else
- if %w(publish leave here_now time).include? request.operation
- return request.envelopes[0]
- else
- return request.envelopes
- end
- end
- rescue
- if request.error_callback
- request.error_callback.call Pubnub::Response.new(
- :error_init => true,
- :message => [0, "Bad server response: #{response.response.code.to_i}"].to_json,
- :response => [0, "Bad server response: #{response.response.code.to_i}"].to_json
- )
- else
- @error_callback.call Pubnub::Response.new(
- :error_init => true,
- :message => [0, "Bad server response: #{response.response.code.to_i}"].to_json,
- :response => [0, "Bad server response: #{response.response.code.to_i}"].to_json
- )
- end
- end
-
- if @sync_retries
- @sync_retries += 1
- else
- @sync_retries = 1
- end
-
- if @sync_retries < @max_retries
- start_request options
- end
- end
- rescue Timeout::Error
- if request.error_callback
- request.error_callback.call [0, 'TIMEOUT']
- else
- @error_callback.call [0, 'TIMEOUT']
- end
+ @async_events.delete_if {|event| event.finished }
end
end
end
- def send_request(request)
- if %w(subscribe presence).include? request.operation
- unless @subscribe_connection
- @subscribe_connection = EM::HttpRequest.new(request.origin, :connect_timeout => 370, :inactivity_timeout => 370)
- connection = @subscribe_connection.get :path => '/time/0', :keepalive => true, :query => request.query
- connection.callback do
- @connect_callback.call 'ASYNC SUBSCRIBE CONNECTION'
- end
- end
+ private
- @subscribe_connection.get :path => request.path, :query => request.query, :keepalive => true
- else
- unless @connection
- @connection = EM::HttpRequest.new request.origin
- end
- @connection.get :path => request.path, :query => request.query, :keepalive => true
+ def leave_all
+ @env[:subscriptions].each do |origin, subscribe|
+ leave(
+ :origin => origin,
+ :channel => subscribe.get_channels.map{|c| c.to_s}.join(','),
+ :http_sync => true,
+ :skip_remove => true
+ )
+
+ subscribe.set_timetoken(0)
end
end
- def is_update?(timetoken)
- if @timetoken.to_i < timetoken.to_i
- @timetoken = timetoken
+ def start_event_machine(options = nil)
+ $logger.debug 'Pubnub::Client#start_event_machine | starting EM in new thread'
+ if defined?(Thin)
+ $logger.debug('Pubnub::Client#start_event_machine | We\'re running on thin')
else
- false
+ $logger.debug('Pubnub::Client#start_event_machine | We aren\'t running on thin')
end
+ if EM.reactor_running?
+ $logger.debug 'Pubnub::Client#start_event_machine | EM already running'
+ else
+ Thread.new { EM.run {} }
+ $logger.debug 'Pubnub::Client#start_event_machine | EM started in new thread'
+ end
+ end
+ def setup_app(options)
+ $logger = options[:logger] || Logger.new('pubnub.log')
+ @env = symbolize_options_keys(options)
+ @env = set_default_values(@env)
+ @async_events = Array.new
+ $logger.debug("\n\nCreated new Pubnub::Client instance")
end
- def is_valid_json?(response)
- begin
- JSON.parse(response)
- valid = true
- rescue
- valid = false
- end
- valid
+ def create_connections_pools(options)
+ @subscribe_event_connections_pool = Hash.new
+ @single_event_connections_pool = Hash.new
end
- def verify_operation(operation, options)
- case operation
- when 'publish'
- raise(ArgumentError, 'publish() requires :channel, :message parameters and, if async, callback parameter or block given.') unless (options[:channel] || options[:channels]) && (options[:callback] || options[:block_given] || options[:http_sync]) && options[:message]
- when 'subscribe'
- raise(ArgumentError, 'subscribe() requires :channel parameters and, if async, callback parameter or block given.') unless (options[:channel] || options[:channels]) && (options[:callback] || options[:block_given] || options[:http_sync])
- when 'presence'
- raise(ArgumentError, 'presence() requires :channel parameters and, if async, callback parameter or block given.') unless (options[:channel] || options[:channels]) && (options[:callback] || options[:block_given] || options[:http_sync])
- when 'time'
- raise(ArgumentError, 'time() require, if async, callback parameter or block given.') unless (options[:callback] || options[:block_given] || options[:http_sync])
- when 'history'
- raise(ArgumentError, 'history() requires :channel, :count parameters and, if async, callback parameter or block given.') unless (options[:channel] || options[:channels]) && (options[:callback] || options[:block_given] || options[:http_sync]) && options[:count]
- when 'here_now'
- raise(ArgumentError, 'here_now() requires :channel parameters and, if async, callback parameter or block given.') unless (options[:channel] || options[:channels]) && (options[:callback] || options[:block_given] || options[:http_sync])
- when 'leave'
- raise(ArgumentError, 'leave() requires :channel parameters and, if async, callback parameter or block given.') unless (options[:channel] || options[:channels]) && (options[:callback] || options[:block_given] || options[:http_sync])
- end
+ def set_default_values(env)
+ defaults = {
+ :error_callback => DEFAULT_ERROR_CALLBACK,
+ :connect_callback => DEFAULT_CONNECT_CALLBACK,
+ :ssl => DEFAULT_SSL,
+ :timetoken => DEFAULT_TIMETOKEN,
+ :uuid => UUID.new.generate,
+ :port => DEFAULT_CONNECTION_PORT,
+ :origin => DEFAULT_ORIGIN,
+ :subscribe_timeout => DEFAULT_SUBSCRIBE_TIMEOUT,
+ :timeout => DEFAULT_NON_SUBSCRIBE_TIMEOUT,
+ :max_retries => MAX_RETRIES,
+ :non_subscribe_timeout => DEFAULT_NON_SUBSCRIBE_TIMEOUT,
+ :reconnect_max_attempts => DEFAULT_RECONNECT_ATTEMPTS,
+ :reconnect_retry_interval => DEFAULT_RECONNECT_INTERVAL,
+ :reconnect_response_timeout => DEFAULT_RECONNECT_RESPONSE_TIMEOUT,
+ :ttl => DEFAULT_TTL,
+ :secret_key => 0
+ }
- unless options[:callback].nil?
- raise('callback is invalid.') unless options[:callback].respond_to? 'call'
+ # Let's fill missing keys with default values
+ $logger.debug('Setting default values')
+ defaults.each do |key,default_value|
+ env[key] = default_value if @env[key].nil?
end
- unless options[:error_callback].nil?
- raise('error_callback is invalid.') unless options[:error_callback].respond_to? 'call'
- end
+ env
+ end
+ def symbolize_options_keys(options)
+ $logger.debug('Symbolizing options keys')
+ symbolized_options = {}
+ options.each_key { |k| symbolized_options.merge!({ k.to_sym => options[k] }) }
+ symbolized_options
end
- def compile_channel_parameter(channel, channels)
- raise(ArgumentError, 'Can\'t handle both :channel and :channels parameters given.') if channel && channels
- channel = channels if channels
- channel = channel.to_s if channel.class == Symbol
- channel = channel.map! {|c| c.to_s}.join(',') if channel.class == Array
- return channel
+ def classify_method(method)
+ method.split('_').map{ |w| w.capitalize }.join
end
+ def validate!(parameters)
+ raise InitializationError.new(:object => self, :message => 'Origin parameter is not valid. Should be type of String') unless parameters[:origin].is_a?(String) || parameters[:origin].blank?
+ raise InitializationError.new(:object => self, :message => 'Missing required :subscribe_key parameter') unless parameters[:subscribe_key]
+ raise InitializationError.new(:object => self, :message => 'Subscribe key parameter is not valid. Should be type of String or Symbol') unless [String, Symbol].include?(parameters[:subscribe_key].class)
+ raise InitializationError.new(:object => self, :message => 'Publish key parameter is not valid. Should be type of String or Symbol') unless [String, Symbol].include?(parameters[:publish_key].class) || parameters[:publish_key].blank?
+ end
+
end
end
-