require 'pubnub/event/signature' require 'pubnub/event/formatter' # Toplevel Pubnub module. module Pubnub # Event module holds most basic and required infrastructure for every pubnub # event, there are also SingleEvent module and SubscribeEvent module class Event attr_reader :origin, :callback, :channel, :channel_group, :fresh_clone, :open_timeout, :read_timeout, :idle_timeout, :group, :presence_callback, :wildcard_channel, :ssl, :state, :given_options, :with_presence alias channels channel alias channel_groups channel_group include Signature include EFormatter def initialize(options, app) @app = app @given_options = options env = app.env.clone env.delete(:state) create_variables_from_options(env.merge(options)) @origin = @app.current_origin format_channels if enable_format_channels? format_group if enable_format_group? set_timestamp validate! telemetry = @app.telemetry_for(@telemetry_name) @compressed_body = nil @current_telemetry = telemetry ? telemetry.round(3) : nil Pubnub.logger.debug('Pubnub::Event') { "Initialized #{self.class}" } end def fire Pubnub.logger.debug('Pubnub::Event') { "Fired event #{self.class}" } response = send_request envelopes = fire_callbacks(handle(response, uri)) finalize_event(envelopes) envelopes end def send_request(compressed_body = '', header = {}) Pubnub.logger.debug('Pubnub::Event') { '#send_request called' } @compressed_body = compressed_body sender = request_dispatcher Pubnub.logger.debug('Pubnub::Event') { '#send_request got sender' } telemetry_time_start = ::Time.now.to_f response = case operation_http_method when 'get' sender.get(uri.to_s, header: header) when 'post' sender.post(uri.to_s, body: compressed_body, header: header) when 'patch' sender.patch(uri.to_s, body: compressed_body, header: header) else sender.delete(uri.to_s, header: header) end begin @app.record_telemetry(@telemetry_name, telemetry_time_start, ::Time.now.to_f) rescue StandardError => e Pubnub.logger.warn('Pubnub::Event') { "Couldn't record telemetry because of #{e}\n#{e.backtrace.join("\n")}" } end response rescue StandardError => e Pubnub.logger.error('Pubnub::Event') { e } e end def uri(memo = true) unless is_a? SubscribeEvent return @uri = uri(false) if memo return @uri if @uri end sa_signature = super_admin_signature(operation_http_method, @compressed_body) unless parameters.include?(:signature) uri = @ssl ? 'https://' : 'http://' uri += @origin uri += path uri += '?' + Formatter.params_hash_to_url_params(parameters) uri += "&signature=#{sa_signature}" if sa_signature Pubnub.logger.debug('Pubnub::Event') { "Requested URI: #{uri}" } URI uri end def finalized? @finalized || @stop end def sync? @http_sync ? true : false end private def enable_format_channels? true end def enable_format_group? true end def operation_http_method case @event when Pubnub::Constants::OPERATION_DELETE, Pubnub::Constants::OPERATION_REMOVE_MESSAGE_ACTION, Pubnub::Constants::OPERATION_REMOVE_CHANNEL_METADATA, Pubnub::Constants::OPERATION_REMOVE_UUID_METADATA, Pubnub::Constants::OPERATION_REVOKE_TOKEN 'delete' when Pubnub::Constants::OPERATION_SET_UUID_METADATA, Pubnub::Constants::OPERATION_SET_CHANNEL_METADATA, Pubnub::Constants::OPERATION_SET_CHANNEL_MEMBERS, Pubnub::Constants::OPERATION_SET_MEMBERSHIPS, Pubnub::Constants::OPERATION_REMOVE_CHANNEL_MEMBERS, Pubnub::Constants::OPERATION_REMOVE_MEMBERSHIPS 'patch' when Pubnub::Constants::OPERATION_ADD_MESSAGE_ACTION 'post' else if @compressed_body.nil? || @compressed_body.empty? 'get' else 'post' end end end def secure_call(cb, arg) cb.call arg rescue StandardError => e Pubnub.logger.error('Pubnub::Event') { "Error while calling callback #{e.inspect}" } end def fire_callbacks(envelope) Pubnub.logger.debug('Pubnub::Event') { "Firing callbacks for #{self.class}" } secure_call @callback, envelope if @callback envelope end def parameters(_set_signature = false) required = { pnsdk: @app.sdk_version } token = @app.env[:token] empty_if_blank = { auth: token || @auth_key, uuid: @app.user_id, @telemetry_name => @current_telemetry } empty_if_blank[@telemetry_name] = nil if @app.env[:no_telemetry] required[:timestamp] = @timestamp if @app.env[:secret_key] && !%i[grant revoke audit].include?(@event) empty_if_blank.delete_if { |_k, v| v.blank? } required.merge(empty_if_blank) end def handle(response, request) Pubnub.logger.debug('Pubnub::Event') { 'Event#handle' } @envelopes = format_envelopes response, request end def create_variables_from_options(options) variables = %w[channel channels message http_sync callback ssl cipher_key random_iv crypto_module secret_key auth_key publish_key subscribe_key timetoken action_timetoken message_timetoken open_timeout read_timeout idle_timeout heartbeat group action read write delete manage ttl presence start end count limit max reverse presence_callback store skip_validate state channel_group channel_groups compressed meta customs custom_message_type include_token include_custom_message_type include_message_actions include_message_type replicate with_presence cipher_key_selector include_meta include_uuid join update get add remove push_token push_gateway environment topic authorized_uuid authorized_user_id token type status value] options = options.each_with_object({}) { |option, obj| obj[option.first.to_sym] = option.last } # variables.each { |variable| instance_variable_set('@' + variable, options[variable.to_sym]) unless variable.nil? } variables.each do |variable| next if variable.nil? ivar = "@#{variable}" instance_variable_set(ivar, options[variable.to_sym]) unless instance_variable_defined?(ivar) && !instance_variable_get(ivar).nil? end return if @event != :subscribe && @event != :presence @open_timeout = options[:s_open_timeout] @read_timeout = options[:s_read_timeout] @idle_timeout = options[:s_idle_timeout] end def set_timestamp @timestamp = current_time end def current_time ::Time.now.to_i end def encode_parameter(parameter, format_json = true) parameter = format_json ? parameter.to_json : parameter URI.encode_www_form_component(parameter).gsub('+', '%20') end def compute_cipher_key(data) ck = @compute_cipher_key || @cipher_key || @app.env[:cipher_key_selector] || @app.env[:cipher_key].to_s return ck unless ck.respond_to?(:call) ck.call(data) end def compute_random_iv(data) ck = @compute_random_iv || @random_iv || @app.env[:random_iv_selector] || @app.env[:random_iv].to_s return ck unless ck.respond_to?(:call) ck.call(data) end # Data processing crypto module. # # @return [Pubnub::Crypto::CryptoProvider, nil] Crypto module for data encryption and # decryption. attr_reader :crypto_module def error_message(parsed_response) parsed_response['message'] rescue StandardError nil end def get_config { tls: @app.env[:ssl], uuid: @app.user_id, auth_key: @app.env[:auth_key], origin: @app.current_origin } end def error_envelope(parsed_response, error, req_res_objects) error_information = nil case error when JSON::ParserError error_category = Pubnub::Constants::STATUS_NON_JSON_RESPONSE code = req_res_objects[:response].code when HTTPClient::TimeoutError error_category = Pubnub::Constants::STATUS_TIMEOUT code = 408 when OpenSSL::SSL::SSLError error_category = Pubnub::Constants::SSL_ERROR code = nil else error_category = Pubnub::Constants::STATUS_ERROR code = req_res_objects[:response].code # Trying to receive useful information about error. unless parsed_response.nil? error_msg, error_details = error_details_from_response parsed_response error_information = { message: error_msg, details: error_details } unless error_msg.nil? end end Pubnub::ErrorEnvelope.new( event: @event, event_options: @given_options, timetoken: nil, status: { code: code, operation: current_operation, client_request: req_res_objects[:request], server_response: req_res_objects[:response], data: error_information, category: error_category, error: true, auto_retried: false, config: get_config } ) end def error_details_from_response(parsed_response) return [nil, nil] if parsed_response.nil? error_msg = parsed_response.include?('message') ? parsed_response['message'] : nil error_details = nil if parsed_response.include?('error') && parsed_response['error'].is_a?(Hash) error_msg = parsed_response['error']['message'] if parsed_response['error'].include? 'message' error_details = parsed_response['error']['details'] if parsed_response['error'].include? 'details' end [error_msg, error_details] end end end