lib/client/ws_client.rb in scale_rb-0.3.2 vs lib/client/ws_client.rb in scale_rb-0.3.3

- old
+ new

@@ -7,64 +7,48 @@ require_relative 'client_ext' module ScaleRb class WsClient def self.start(url) - Async do |task| + + Sync do |task| endpoint = Async::HTTP::Endpoint.parse(url, alpn_protocols: Async::HTTP::Protocol::HTTP11.names) - client = WsClient.new - task.async do - Async::WebSocket::Client.connect(endpoint) do |connection| - Async do - while request = client.next_request - ScaleRb.logger.debug "Sending request: #{request.to_json}" - connection.write(request.to_json) - end - end + Async::WebSocket::Client.connect(endpoint) do |connection| + client = WsClient.new(connection) - # inside main task + recv_task = task.async do while message = connection.read - data = JSON.parse(message) + data = message.parse ScaleRb.logger.debug "Received message: #{data}" - Async do + task.async do client.handle_response(data) - rescue => e - ScaleRb.logger.error "#{e.class}: #{e.message}" - ScaleRb.logger.error e.backtrace.join("\n") - task.stop end end - rescue => e - ScaleRb.logger.error "#{e.class}: #{e.message}" - ScaleRb.logger.error e.backtrace.join("\n") - ensure - task.stop end - end - task.async do - client.supported_methods = client.rpc_methods()['methods'] + client.supported_methods = client.rpc_methods()[:methods] yield client - rescue => e - ScaleRb.logger.error "#{e.class}: #{e.message}" - ScaleRb.logger.error e.backtrace.join("\n") - task.stop + + recv_task.wait + ensure + recv_task&.stop end - end - end + end # Sync + + end # start end end module ScaleRb class WsClient include ClientExt attr_accessor :supported_methods - def initialize - @queue = Async::Queue.new + def initialize(connection) + @connection = connection @response_handler = ResponseHandler.new @subscription_handler = SubscriptionHandler.new @request_id = 1 end @@ -85,11 +69,11 @@ unsubscribe(method, args[0]) elsif method.include?('subscribe') raise "A subscribe method needs a block" unless block_given? subscribe(method, args) do |notification| - yield notification['params']['result'] + yield notification[:params][:result] end else request(method, args) end end @@ -109,18 +93,14 @@ if @subscription_handler.unsubscribe(subscription_id) request(method, [subscription_id]) end end - def next_request - @queue.dequeue - end - def handle_response(response) - if response.key?('id') + if response.key?(:id) @response_handler.handle(response) - elsif response.key?('method') + elsif response.key?(:method) @subscription_handler.handle(response) else puts "Received an unknown message: #{response}" end end @@ -129,91 +109,62 @@ def request(method, params = []) response_future = Async::Notification.new @response_handler.register(@request_id, proc { |response| - # this is running in the main task - response_future.signal(response['result']) + response_future.signal(response[:result]) }) - request = JsonRpcRequest.new(@request_id, method, params) - @queue.enqueue(request) + request = { jsonrpc: '2.0', id: @request_id, method: method, params: params } + ScaleRb.logger.debug "Sending request: #{request}" + @connection.write(request.to_json) @request_id += 1 - response_future.wait end end - class JsonRpcRequest - attr_reader :id, :method, :params - - def initialize(id, method, params = {}) - @id = id - @method = method - @params = params - end - - def to_json(*_args) - { jsonrpc: '2.0', id: @id, method: @method, params: @params }.to_json - end - - # def to_s - # to_json - # end - end - class ResponseHandler def initialize - @handlers = {} + @callbacks = {} end - # handler: a proc with response data as param - def register(id, handler) - @handlers[id] = handler + # callback: a proc with response data as param + def register(id, callback) + @callbacks[id] = callback end def handle(response) - id = response['id'] - if @handlers.key?(id) - handler = @handlers[id] - handler.call(response) - @handlers.delete(id) + id = response[:id] + if @callbacks.key?(id) + callback = @callbacks[id] + callback.call(response) + @callbacks.delete(id) else ScaleRb.logger.debug "Received a message with unknown id: #{response}" end end end class SubscriptionHandler def initialize - @subscriptions = {} + @callbacks = {} end - def subscribe(subscription_id, handler) - @subscriptions[subscription_id] = handler + def subscribe(subscription_id, callback) + @callbacks[subscription_id] = callback end def unsubscribe(subscription_id) - @subscriptions.delete(subscription_id) + @callbacks.delete(subscription_id) end def handle(notification) - subscription_id = notification.dig('params', 'subscription') + subscription_id = notification.dig(:params, :subscription) return if subscription_id.nil? - if @subscriptions.key?(subscription_id) - @subscriptions[subscription_id].call(notification) - else - # the subscription_id may be not registered. - # in client.subscribe function, - # ... - # subscription_id = request(method, params) - # @subscription_handler.subscribe(subscription_id, block) - # ... - # the request(method, params) may be slow, so the subscription_id may be not registered when the first notification comes. - sleep 0.01 - handle(notification) + if @callbacks.key?(subscription_id) + @callbacks[subscription_id].call(notification) end end end end