# frozen_string_literal: true require 'grpc_kit/grpc_time' require 'grpc_kit/session/io' require 'grpc_kit/session/client_session' require 'grpc_kit/stream/client_stream' require 'grpc_kit/transport/client_transport' module GrpcKit class Client # @param sock [TCPSocket] # @param authority [nil, String] # @param interceptors [Array] list of interceptors # @param timeout [nil, Integer, String] # @param max_receive_message_size [Integer, nil] Specify the default maximum size of inbound message in bytes. Default to 4MB. # @param max_send_message_size [Integer, nil] Specify the default maximum size of outbound message in bytes. Default to 4MB. def initialize(sock, authority: nil, interceptors: [], timeout: nil, max_receive_message_size: nil, max_send_message_size: nil) @sock = sock @authority = if authority authority else addr = sock.addr "#{addr[3]}:#{addr[1]}" end @timeout = timeout && GrpcKit::GrpcTime.new(timeout) # Defined at GrpcKit::Grpc::Dsl#.rpc_stub_class build_rpcs( interceptors, max_receive_message_size: max_receive_message_size, max_send_message_size: max_send_message_size, ) end # @param rpc [GrpcKit::Rpcs::Client::RequestResponse] # @param request [Object] # @param opts [Hash] def request_response(rpc, request, **opts) GrpcKit.logger.debug('Calling request_respose') do_request(rpc, request, **opts) end # @param rpc [GrpcKit::Rpcs::Client::ClientStreamer] # @param opts [Hash] def client_streamer(rpc, **opts) GrpcKit.logger.debug('Calling client_streamer') do_request(rpc, nil, **opts) end # @param rpc [GrpcKit::Rpcs::Client::ServerStreamer] # @param request [Object] # @param opts [Hash] def server_streamer(rpc, request, **opts) GrpcKit.logger.debug('Calling server_streamer') do_request(rpc, request, **opts) end # @param rpc [GrpcKit::Rpcs::Client::ServerStreamer] # @param _requests [Object] it's for compatibility, no use # @param opts [Hash] def bidi_streamer(rpc, _requests, **opts) GrpcKit.logger.debug('Calling bidi_streamer') do_request(rpc, nil, **opts) end private def do_request(rpc, request, **opts) t = GrpcKit::Transport::ClientTransport.new(session) timeout = (opts[:timeout] && GrpcKit::GrpcTime.new(opts[:timeout])) || @timeout cs = GrpcKit::Stream::ClientStream.new(t, rpc.config, authority: @authority, timeout: timeout) rpc.invoke(cs, request, timeout: timeout, **opts) end def session @session ||= begin s = GrpcKit::Session::ClientSession.new(GrpcKit::Session::IO.new(@sock)) s.submit_settings([]) s end end end end