# frozen_string_literal: true require 'faye/websocket' require 'websocket/extensions' require 'permessage_deflate' module Cryptum # This plugin is used to Establish a Web # Socket Connection with Coinbase module WebSock # This module is used to Establish a Web # Socket Connection with Coinbase module Coinbase # Supported Method Parameters:: # Cryptum::WebSock.connect( # ) public_class_method def self.connect(opts = {}) option_choice = opts[:option_choice] env = opts[:env] # cb_pro_ws_feed = 'wss://ws-feed.pro.coinbase.com' # cb_pro_ws_feed = 'wss://ws-feed-public.sandbox.pro.coinbase.com' if env[:env] == :sandbox cb_pro_ws_feed = 'wss://ws-feed.exchange.coinbase.com' cb_pro_ws_feed = 'wss://ws-feed-public.sandbox.exchange.coinbase.com' if env[:env] == :sandbox extensions = [PermessageDeflate] if option_choice.proxy tls_opts = { verify_peer: false } proxy_opts = { origin: option_choice.proxy } ws = Faye::WebSocket::Client.new( cb_pro_ws_feed, [], tls: tls_opts, proxy: proxy_opts, extensions: extensions, ping: 9 ) else ws = Faye::WebSocket::Client.new( cb_pro_ws_feed, [], extensions: extensions, ping: 9 ) end ws rescue Interrupt # Exit Gracefully if CTRL+C is Pressed During Session Cryptum::UI::Exit.gracefully(which_self: self) rescue StandardError => e raise e end public_class_method def self.subscribe_message(opts = {}) option_choice = opts[:option_choice] env = opts[:env] product_id = option_choice.symbol.to_s.gsub('_', '-').upcase api_secret = env[:api_secret] api_signature_response = Cryptum::API::Signature.generate( api_secret: api_secret ) api_key = env[:api_key] api_passphrase = env[:api_passphrase] api_timestamp = api_signature_response[:api_timestamp] api_signature = api_signature_response[:api_signature] "{ \"type\": \"subscribe\", \"product_ids\": [ \"#{product_id}\" ], \"channels\": [ \"level2_batch\", \"ticker\", \"user\" ], \"key\": \"#{api_key}\", \"passphrase\": \"#{api_passphrase}\", \"timestamp\": \"#{api_timestamp}\", \"signature\": \"#{api_signature}\" }" rescue StandardError => e raise e end # Display Usage for this Module public_class_method def self.help puts "USAGE: logger = #{self}.create() " end end end end