require 'base64' require 'pubnub/error' require 'pubnub/uuid' require 'pubnub/formatter' require 'pubnub/crypto' require 'pubnub/constants' require 'pubnub/configuration' require 'pubnub/subscribe_callback' require 'pubnub/schemas/envelope_schema' require 'pubnub/event' require 'pubnub/single_event' require 'pubnub/subscribe_event/callbacks' require 'pubnub/subscribe_event/formatter' require 'pubnub/subscribe_event/heartbeat' require 'pubnub/subscribe_event/adding' require 'pubnub/subscribe_event/removing' require 'pubnub/subscribe_event' require 'pubnub/pam' require 'pubnub/heart' require 'pubnub/subscriber' require 'pubnub/telemetry' require 'pubnub/envelope' require 'pubnub/error_envelope' require 'pubnub/client/events' require 'pubnub/client/paged_history' require 'pubnub/client/helpers' require 'pubnub/client/getters_setters' require 'pubnub/validators/common_validator' require 'pubnub/validators/client' require 'pubnub/validators/audit' require 'pubnub/validators/channel_registration' require 'pubnub/validators/grant' require 'pubnub/validators/grant_token' require 'pubnub/validators/revoke_token' require 'pubnub/validators/heartbeat' require 'pubnub/validators/here_now' require 'pubnub/validators/history' require 'pubnub/validators/leave' require 'pubnub/validators/presence' require 'pubnub/validators/publish' require 'pubnub/validators/revoke' require 'pubnub/validators/set_state' require 'pubnub/validators/state' require 'pubnub/validators/subscribe' require 'pubnub/validators/time' require 'pubnub/validators/where_now' require 'pubnub/validators/delete' require 'pubnub/validators/message_counts' require 'pubnub/validators/add_channels_to_push' require 'pubnub/validators/list_push_provisions' require 'pubnub/validators/remove_channels_from_push' require 'pubnub/validators/remove_device_from_push' require 'pubnub/validators/signal' require 'pubnub/validators/set_uuid_metadata' require 'pubnub/validators/set_channel_metadata' require 'pubnub/validators/remove_uuid_metadata' require 'pubnub/validators/remove_channel_metadata' require 'pubnub/validators/get_uuid_metadata' require 'pubnub/validators/get_all_uuid_metadata' require 'pubnub/validators/get_channel_metadata' require 'pubnub/validators/get_all_channels_metadata' require 'pubnub/validators/get_channel_members' require 'pubnub/validators/get_memberships' require 'pubnub/validators/set_channel_members' require 'pubnub/validators/set_memberships' require 'pubnub/validators/remove_channel_members' require 'pubnub/validators/remove_memberships' require 'pubnub/cbor' Dir[File.join(File.dirname(__dir__), 'pubnub', 'events', '*.rb')].each do |file| require file end # Toplevel Pubnub module. module Pubnub # Pubnub client Class. class Client include Configuration include Events include PagedHistory include Helpers include GettersSetters attr_reader :env, :subscriber, :telemetry VERSION = Pubnub::VERSION # Parameters: # =========== # #
#
subscribe_key
#
required. Your subscribe key.
# #
publish_key
#
optional. Your publish key, without it you can't push messages.
# #
secret_key
#
optional. Your secret key, required for PAM operations.
# #
auth_key
#
optional. This client auth key.
# #
cipher_key
#
optional. Required to encrypt messages.
# #
uuid
#
optional. Deprecated. Sets given uuid as client uuid, does not generates random uuid on init as usually
# #
user_id
#
required. Sets given user_id as client user_id.
# #
origin
#
optional. Specifies the fully qualified domain name of the PubNub origin. # By default this value is set to pubsub.pubnub.com but it should be set to the appropriate origin # specified in the PubNub Admin Portal.
# #
callback
#
optional. Default callback function for all events if not overwrote while firing event.
# #
ssl
#
optional. Your connection will use ssl if set to true.
# #
random_iv
#
optional. Whether data should be encrypted / decrypted using random initialization vector.
# #
heartbeat
#
optional. Heartbeat interval, if not set heartbeat will not be running.
# #
subscribe_timeout
#
optional, be careful when modifying this. Timeout for subscribe connection in seconds.
# #
non_subscribe_timeout
#
optional, be careful when modifying this. Timeout for non-subscribe connection in seconds.
# #
max_retries
#
optional. How many times client should try to reestablish connection before fail.
# #
ttl
#
optional. Default ttl for grant and revoke events.
#
# _examples:_ # ```ruby # # Minimal initialize # pubnub = Pubnub.new(subscribe_key: :my_sub_key) # ```` # # ```ruby # # More complex initialize # pubnub = Pubnub.new( # subscribe_key: :demo, # publish_key: :demo, # secret_key: :secret, # cipher_key: :other_secret, # user_id: :mad_max, # origin: 'custom.pubnub.com', # callback: ->(envelope) { puts envelope.message }, # connect_callback: ->(message) { puts message }, # heartbeat: 60, # subscribe_timeout: 310, # non_subscribe_timeout: 10, # max_retries: 10, # ttl: 0 # ) # ``` # Returns: # ======== # # Initialized Pubnub::Client ready to use. # def initialize(options) env_hash = symbolize_options_keys(options) setup_app env_hash clean_env prepare_env validate! @env @telemetry = Telemetry.new Pubnub.logger.debug('Pubnub::Client') do "Created new Pubnub::Client instance. Version: #{Pubnub::VERSION}" end end def add_listener(options) @subscriber.add_listener(options) end def remove_listener(options) @subscriber.remove_listener(options) end def subscribed_channels @subscriber.channels + @subscriber.wildcard_channels end def subscribed_groups @subscriber.groups end # Returns: # ======== # True if client is subscribed to at least one channel or channel group, otherwise false. def subscribed? if @subscriber.nil? false else ![@subscriber.channels, @subscriber.groups, @subscriber.wildcard_channels].flatten.empty? end end # Returns: # ======== # Hash with two keys: :channel and :group, representing currently subscribed channels and groups. def subscribed_to(separate_wildcard = false) if separate_wildcard { channel: @subscriber.channels, group: @subscriber.groups, wildcard_channel: @subscriber.wildcard_channels } else { channel: @subscriber.channels + @subscriber.wildcard_channels, group: @subscriber.groups } end end # Parameters: # =========== #
#
origin
#
Domain name where connection should be connected.
# #
event_type
#
Keyword. :subscribe_event or :single_event.
# #
sync
#
Boolean. True if we want dispatcher for sync or sync event, otherwise false.
#
# # Returns: # ======== # Appropriate RequestDispatcher. # # It returns always new RequestDispatcher for sync events. # For async events it checks if there's already RequestDispatcher # created and returns it if created, otherwise creates it, assigns # it in @env and returns newly created dispatcher. def request_dispatcher(origin, event_type, sync) Pubnub.logger.debug('Pubnub::Client') do "Looking for requester for #{sync ? 'sync' : 'async'} #{event_type}" end if sync @env[:req_dispatchers_pool][:sync][origin] ||= {} @env[:req_dispatchers_pool][:sync][origin][event_type] ||= setup_httpclient(event_type) else @env[:req_dispatchers_pool][:async][origin] ||= {} @env[:req_dispatchers_pool][:async][origin][event_type] ||= setup_httpclient(event_type) end end # Parameters: # =========== #
#
origin
#
Domain name where connection should be connected.
# #
event_type
#
Keyword. :subscribe_event or :single_event.
#
# # Functionality: # ============== # Terminates request dispatcher for given origin and event type. Usable while restarting subscription. def kill_request_dispatcher(origin, event_type) Pubnub.logger.debug('Pubnub::Client') { 'Killing requester' } # @env[:req_dispatchers_pool][origin][event_type].async.terminate @env[:req_dispatchers_pool][:async][origin][event_type].reset_all @env[:req_dispatchers_pool][:async][origin][event_type] = nil rescue StandardError Pubnub.logger.debug('Pubnub::Client') { 'There\'s no requester' } end def sequence_number_for_publish! @env[:sequence_number_for_publish] += 1 @env[:sequence_number_for_publish] % 2 ** 32 end def apply_state(event) Pubnub.logger.debug('Pubnub::Client') { 'Apply state' } create_state_pools(event) return unless event.state event.channel.each do |channel| @env[:state][event.origin][:channel][channel] = event.state end event.group.each do |group| @env[:state][event.origin][:group][group] = event.state end end def empty_state? return true unless @env[:state] totally_empty @env[:state] end def generate_ortt (::Time.now.to_f * 10_000_000).to_i end def record_telemetry(telemetry_type, time_start, time_end) @telemetry.async.record_request(telemetry_type, time_start, time_end) end def telemetry_for(event) @telemetry.await.fetch_average(event).value end def parse_token(token) token_bytes = Base64.urlsafe_decode64(token) Cbor.new.decode(token_bytes.bytes) end def set_token(token) @env[:token] = token end private def create_state_pools(event) @env[:state] ||= {} @env[:state][event.origin] ||= {} @env[:state][event.origin][:channel] ||= {} @env[:state][event.origin][:group] ||= {} end def setup_httpclient(event_type) hc = if ENV['HTTP_PROXY'] HTTPClient.new(ENV['HTTP_PROXY']) else HTTPClient.new end case event_type when :subscribe_event hc.connect_timeout = @env[:s_open_timeout] hc.send_timeout = @env[:s_send_timeout] hc.receive_timeout = @env[:s_read_timeout] unless @env[:disable_keepalive] || @env[:disable_subscribe_keepalive] hc.keep_alive_timeout = @env[:idle_timeout] hc.tcp_keepalive = true end when :single_event hc.connect_timeout = @env[:open_timeout] hc.send_timeout = @env[:send_timeout] hc.receive_timeout = @env[:read_timeout] unless @env[:disable_keepalive] || @env[:disable_non_subscribe_keepalive] hc.keep_alive_timeout = @env[:idle_timeout] hc.tcp_keepalive = true end end hc end def validate!(env) Validator::Client.validate! env end def setup_app(options) Pubnub.logger = options[:logger] || Logger.new('pubnub.log') Concurrent.global_logger = Pubnub.logger @subscriber = Subscriber.new(self) options[:user_id] = options[:uuid] if options[:user_id].nil? @env = options end def prepare_env assign_defaults setup_pools end def assign_defaults @env[:origin] = @env[:origins_pool].first if @env[:origins_pool] default_values.each do |k, v| @env[k] = v unless @env.has_key?(k) end @env[:timetoken] = 0 @env[:sequence_number_for_publish] = 0 end def symbolize_options_keys(options) symbolized_options = {} options.each_key do |k| symbolized_options.merge!(k.to_sym => options[k]) end symbolized_options end end end