module PipeRpc class Hub def initialize(args) @channel = args[:input] @socket = Socket.new(self, args) @requester = Requester.new(self) @responder = Responder.new(self) @clients = {} self.loop_iteration = nil end attr_reader :channel attr_accessor :logger def log(msg) if logger == :reflect notify(server: :reflect_logger, method: :log, arguments: [msg]) elsif logger.respond_to? :debug logger.debug msg elsif logger logger.call msg end end def add_server(servers) @responder.add_server(servers) end def rmv_server(server_name) @responder.rmv_server(server_name) end def on_incoming_request(&on_request) @responder.on_request(&on_request) end def client_for(server_name) @clients[server_name] ||= Client.new(self, server: server_name) end def loop_iteration=(proc) @loop_iteration = proc end def on_sent(&on_sent) @socket.on_write(&on_sent) end def on_received(&on_received) @socket.on_read(&on_received) end def notify(signature) @socket.write @requester.notification(signature) end def request(signature, &on_result) result = nil result_ready = false request = @requester.request(signature) do |*response| result = on_result.call(*response) result_ready = true end @socket.write request loop do return result if result_ready @loop_iteration ? @loop_iteration.call : handle_message end end def send_error(error = {}) @socket.write ErrorResponse.new(id: error.delete(:id), error: error) end def handle_message message = next_message # blocks return unless message if message.notification? @responder.handle_notification(message.handler) elsif message.request? @responder.handle_request(message.handler) { |response| @socket.write response } elsif message.response? @requester.handle_response(message.handler) elsif message.error? raise message.handler.native_error else send_error(id: nil, code: -32600, data: { message: 'no request, result or error' }) end end private def next_message Message.new(self, @socket.read) # blocks rescue => e send_error(id: nil, code: -32700, data: { message: e.message }) nil end end end