lib/client/ws_client.rb in scale_rb-0.3.3 vs lib/client/ws_client.rb in scale_rb-0.3.4
- old
+ new
@@ -1,44 +1,57 @@
require 'async'
require 'async/websocket/client'
require 'async/http/endpoint'
-require 'async/queue'
-require 'json'
require_relative 'client_ext'
module ScaleRb
class WsClient
- def self.start(url)
- Sync do |task|
- endpoint = Async::HTTP::Endpoint.parse(url, alpn_protocols: Async::HTTP::Protocol::HTTP11.names)
+ class << self
+ # @param [string] url
+ def start(url)
+ Sync do
+ endpoint = Async::HTTP::Endpoint.parse(url, alpn_protocols: Async::HTTP::Protocol::HTTP11.names)
- Async::WebSocket::Client.connect(endpoint) do |connection|
- client = WsClient.new(connection)
+ Async::WebSocket::Client.connect(endpoint) do |connection|
+ client = WsClient.new(connection)
- recv_task = task.async do
- while message = connection.read
- data = message.parse
- ScaleRb.logger.debug "Received message: #{data}"
+ # `recv_task` does not raise errors (subclass of StandardError), so it will not be stopped by any errors.
+ recv_task = Async do
+ while (message = client.read_message)
+ data = parse_message(message)
+ next if data.nil?
- task.async do
- client.handle_response(data)
+ ScaleRb.logger.debug "Received response: #{data}"
+ Async do
+ client.handle_response(data)
+ end
end
end
- end
- client.supported_methods = client.rpc_methods()[:methods]
- yield client
+ client.supported_methods = client.rpc_methods()[:methods]
+ yield client
- recv_task.wait
- ensure
- recv_task&.stop
+ recv_task.wait
+ ensure
+ recv_task&.stop
+ end
end
- end # Sync
- end # start
+ end
+
+ private
+
+ def parse_message(message)
+ message.parse
+ rescue StandardError => e
+ ScaleRb.logger.error "Error while parsing message: #{e.inspect}, message: #{message}"
+ nil
+ end
+ end
+
end
end
module ScaleRb
class WsClient
@@ -66,11 +79,11 @@
end
if method.include?('unsubscribe')
unsubscribe(method, args[0])
elsif method.include?('subscribe')
- raise "A subscribe method needs a block" unless block_given?
+ raise 'A subscribe method needs a block' unless block_given?
subscribe(method, args) do |notification|
yield notification[:params][:result]
end
else
@@ -99,14 +112,28 @@
if response.key?(:id)
@response_handler.handle(response)
elsif response.key?(:method)
@subscription_handler.handle(response)
else
- puts "Received an unknown message: #{response}"
+ ScaleRb.logger.info "Received an unknown response: #{response}"
end
+ rescue StandardError => e
+ ScaleRb.logger.error "Error while handling response: #{e.inspect}"
+ ScaleRb.logger.debug e.backtrace.join("\n")
end
+ def read_message
+ loop do
+ return @connection.read
+ rescue StandardError => e
+ ScaleRb.logger.error "Error while read message from connection: #{e.inspect}"
+ ScaleRb.logger.debug e.backtrace.join("\n")
+ sleep 1
+ retry
+ end
+ end
+
private
def request(method, params = [])
response_future = Async::Notification.new
@@ -138,10 +165,10 @@
if @callbacks.key?(id)
callback = @callbacks[id]
callback.call(response)
@callbacks.delete(id)
else
- ScaleRb.logger.debug "Received a message with unknown id: #{response}"
+ ScaleRb.logger.info "Received a message with unknown id: #{response}"
end
end
end
class SubscriptionHandler