lib/client/ws_client.rb in scale_rb-0.3.3 vs lib/client/ws_client.rb in scale_rb-0.3.4

- old
+ new

@@ -1,44 +1,57 @@ require 'async' require 'async/websocket/client' require 'async/http/endpoint' -require 'async/queue' -require 'json' require_relative 'client_ext' module ScaleRb class WsClient - def self.start(url) - Sync do |task| - endpoint = Async::HTTP::Endpoint.parse(url, alpn_protocols: Async::HTTP::Protocol::HTTP11.names) + class << self + # @param [string] url + def start(url) + Sync do + endpoint = Async::HTTP::Endpoint.parse(url, alpn_protocols: Async::HTTP::Protocol::HTTP11.names) - Async::WebSocket::Client.connect(endpoint) do |connection| - client = WsClient.new(connection) + Async::WebSocket::Client.connect(endpoint) do |connection| + client = WsClient.new(connection) - recv_task = task.async do - while message = connection.read - data = message.parse - ScaleRb.logger.debug "Received message: #{data}" + # `recv_task` does not raise errors (subclass of StandardError), so it will not be stopped by any errors. + recv_task = Async do + while (message = client.read_message) + data = parse_message(message) + next if data.nil? - task.async do - client.handle_response(data) + ScaleRb.logger.debug "Received response: #{data}" + Async do + client.handle_response(data) + end end end - end - client.supported_methods = client.rpc_methods()[:methods] - yield client + client.supported_methods = client.rpc_methods()[:methods] + yield client - recv_task.wait - ensure - recv_task&.stop + recv_task.wait + ensure + recv_task&.stop + end end - end # Sync - end # start + end + + private + + def parse_message(message) + message.parse + rescue StandardError => e + ScaleRb.logger.error "Error while parsing message: #{e.inspect}, message: #{message}" + nil + end + end + end end module ScaleRb class WsClient @@ -66,11 +79,11 @@ end if method.include?('unsubscribe') unsubscribe(method, args[0]) elsif method.include?('subscribe') - raise "A subscribe method needs a block" unless block_given? + raise 'A subscribe method needs a block' unless block_given? subscribe(method, args) do |notification| yield notification[:params][:result] end else @@ -99,14 +112,28 @@ if response.key?(:id) @response_handler.handle(response) elsif response.key?(:method) @subscription_handler.handle(response) else - puts "Received an unknown message: #{response}" + ScaleRb.logger.info "Received an unknown response: #{response}" end + rescue StandardError => e + ScaleRb.logger.error "Error while handling response: #{e.inspect}" + ScaleRb.logger.debug e.backtrace.join("\n") end + def read_message + loop do + return @connection.read + rescue StandardError => e + ScaleRb.logger.error "Error while read message from connection: #{e.inspect}" + ScaleRb.logger.debug e.backtrace.join("\n") + sleep 1 + retry + end + end + private def request(method, params = []) response_future = Async::Notification.new @@ -138,10 +165,10 @@ if @callbacks.key?(id) callback = @callbacks[id] callback.call(response) @callbacks.delete(id) else - ScaleRb.logger.debug "Received a message with unknown id: #{response}" + ScaleRb.logger.info "Received a message with unknown id: #{response}" end end end class SubscriptionHandler