lib/client/ws_client.rb in scale_rb-0.3.2 vs lib/client/ws_client.rb in scale_rb-0.3.3
- old
+ new
@@ -7,64 +7,48 @@
require_relative 'client_ext'
module ScaleRb
class WsClient
def self.start(url)
- Async do |task|
+
+ Sync do |task|
endpoint = Async::HTTP::Endpoint.parse(url, alpn_protocols: Async::HTTP::Protocol::HTTP11.names)
- client = WsClient.new
- task.async do
- Async::WebSocket::Client.connect(endpoint) do |connection|
- Async do
- while request = client.next_request
- ScaleRb.logger.debug "Sending request: #{request.to_json}"
- connection.write(request.to_json)
- end
- end
+ Async::WebSocket::Client.connect(endpoint) do |connection|
+ client = WsClient.new(connection)
- # inside main task
+ recv_task = task.async do
while message = connection.read
- data = JSON.parse(message)
+ data = message.parse
ScaleRb.logger.debug "Received message: #{data}"
- Async do
+ task.async do
client.handle_response(data)
- rescue => e
- ScaleRb.logger.error "#{e.class}: #{e.message}"
- ScaleRb.logger.error e.backtrace.join("\n")
- task.stop
end
end
- rescue => e
- ScaleRb.logger.error "#{e.class}: #{e.message}"
- ScaleRb.logger.error e.backtrace.join("\n")
- ensure
- task.stop
end
- end
- task.async do
- client.supported_methods = client.rpc_methods()['methods']
+ client.supported_methods = client.rpc_methods()[:methods]
yield client
- rescue => e
- ScaleRb.logger.error "#{e.class}: #{e.message}"
- ScaleRb.logger.error e.backtrace.join("\n")
- task.stop
+
+ recv_task.wait
+ ensure
+ recv_task&.stop
end
- end
- end
+ end # Sync
+
+ end # start
end
end
module ScaleRb
class WsClient
include ClientExt
attr_accessor :supported_methods
- def initialize
- @queue = Async::Queue.new
+ def initialize(connection)
+ @connection = connection
@response_handler = ResponseHandler.new
@subscription_handler = SubscriptionHandler.new
@request_id = 1
end
@@ -85,11 +69,11 @@
unsubscribe(method, args[0])
elsif method.include?('subscribe')
raise "A subscribe method needs a block" unless block_given?
subscribe(method, args) do |notification|
- yield notification['params']['result']
+ yield notification[:params][:result]
end
else
request(method, args)
end
end
@@ -109,18 +93,14 @@
if @subscription_handler.unsubscribe(subscription_id)
request(method, [subscription_id])
end
end
- def next_request
- @queue.dequeue
- end
-
def handle_response(response)
- if response.key?('id')
+ if response.key?(:id)
@response_handler.handle(response)
- elsif response.key?('method')
+ elsif response.key?(:method)
@subscription_handler.handle(response)
else
puts "Received an unknown message: #{response}"
end
end
@@ -129,91 +109,62 @@
def request(method, params = [])
response_future = Async::Notification.new
@response_handler.register(@request_id, proc { |response|
- # this is running in the main task
- response_future.signal(response['result'])
+ response_future.signal(response[:result])
})
- request = JsonRpcRequest.new(@request_id, method, params)
- @queue.enqueue(request)
+ request = { jsonrpc: '2.0', id: @request_id, method: method, params: params }
+ ScaleRb.logger.debug "Sending request: #{request}"
+ @connection.write(request.to_json)
@request_id += 1
-
response_future.wait
end
end
- class JsonRpcRequest
- attr_reader :id, :method, :params
-
- def initialize(id, method, params = {})
- @id = id
- @method = method
- @params = params
- end
-
- def to_json(*_args)
- { jsonrpc: '2.0', id: @id, method: @method, params: @params }.to_json
- end
-
- # def to_s
- # to_json
- # end
- end
-
class ResponseHandler
def initialize
- @handlers = {}
+ @callbacks = {}
end
- # handler: a proc with response data as param
- def register(id, handler)
- @handlers[id] = handler
+ # callback: a proc with response data as param
+ def register(id, callback)
+ @callbacks[id] = callback
end
def handle(response)
- id = response['id']
- if @handlers.key?(id)
- handler = @handlers[id]
- handler.call(response)
- @handlers.delete(id)
+ id = response[:id]
+ if @callbacks.key?(id)
+ callback = @callbacks[id]
+ callback.call(response)
+ @callbacks.delete(id)
else
ScaleRb.logger.debug "Received a message with unknown id: #{response}"
end
end
end
class SubscriptionHandler
def initialize
- @subscriptions = {}
+ @callbacks = {}
end
- def subscribe(subscription_id, handler)
- @subscriptions[subscription_id] = handler
+ def subscribe(subscription_id, callback)
+ @callbacks[subscription_id] = callback
end
def unsubscribe(subscription_id)
- @subscriptions.delete(subscription_id)
+ @callbacks.delete(subscription_id)
end
def handle(notification)
- subscription_id = notification.dig('params', 'subscription')
+ subscription_id = notification.dig(:params, :subscription)
return if subscription_id.nil?
- if @subscriptions.key?(subscription_id)
- @subscriptions[subscription_id].call(notification)
- else
- # the subscription_id may be not registered.
- # in client.subscribe function,
- # ...
- # subscription_id = request(method, params)
- # @subscription_handler.subscribe(subscription_id, block)
- # ...
- # the request(method, params) may be slow, so the subscription_id may be not registered when the first notification comes.
- sleep 0.01
- handle(notification)
+ if @callbacks.key?(subscription_id)
+ @callbacks[subscription_id].call(notification)
end
end
end
end