lib/httpx/plugins/grpc.rb in httpx-0.24.7 vs lib/httpx/plugins/grpc.rb in httpx-1.0.0

- old
+ new

@@ -47,24 +47,23 @@ }.freeze class << self def load_dependencies(*) require "stringio" + require "httpx/plugins/grpc/grpc_encoding" require "httpx/plugins/grpc/message" require "httpx/plugins/grpc/call" end def configure(klass) klass.plugin(:persistent) - klass.plugin(:compression) klass.plugin(:stream) end def extra_options(options) options.merge( fallback_protocol: "h2", - http2_settings: { wait_for_handshake: false }, grpc_rpcs: {}.freeze, grpc_compression: false, grpc_deadline: DEADLINE ) end @@ -106,13 +105,22 @@ def merge_headers(trailers) @trailing_metadata = Hash[trailers] super end + end - def encoders - @options.encodings + module RequestBodyMethods + def initialize(headers, _) + super + + if (compression = headers["grpc-encoding"]) + deflater_body = self.class.initialize_deflater_body(@body, compression) + @body = Transcoder::GRPCEncoding.encode(deflater_body || @body, compressed: !deflater_body.nil?) + else + @body = Transcoder::GRPCEncoding.encode(@body, compressed: false) + end end end module InstanceMethods def with_channel_credentials(ca_path, key = nil, cert = nil, **ssl_opts) @@ -139,33 +147,21 @@ rpc_opts = { deadline: @options.grpc_deadline, }.merge(opts) - local_rpc_name = rpc_name.underscore - session_class = Class.new(self.class) do - # define rpc method with ruby style name class_eval(<<-OUT, __FILE__, __LINE__ + 1) - def #{local_rpc_name}(input, **opts) # def grpc_action(input, **opts) - rpc_execute("#{local_rpc_name}", input, **opts) # rpc_execute("grpc_action", input, **opts) - end # end + def #{rpc_name}(input, **opts) # def grpc_action(input, **opts) + rpc_execute("#{rpc_name}", input, **opts) # rpc_execute("grpc_action", input, **opts) + end # end OUT - - # define rpc method with original name - unless local_rpc_name == rpc_name - class_eval(<<-OUT, __FILE__, __LINE__ + 1) - def #{rpc_name}(input, **opts) # def grpcAction(input, **opts) - rpc_execute("#{local_rpc_name}", input, **opts) # rpc_execute("grpc_action", input, **opts) - end # end - OUT - end end session_class.new(@options.merge( grpc_rpcs: @options.grpc_rpcs.merge( - local_rpc_name => [rpc_name, input, output, rpc_opts] + rpc_name.underscore => [rpc_name, input, output, rpc_opts] ).freeze )) end def build_stub(origin, service: nil, compression: false) @@ -243,41 +239,27 @@ rpc_method = "/#{rpc_method}" unless rpc_method.start_with?("/") rpc_method = "/#{@options.grpc_service}#{rpc_method}" if @options.grpc_service uri.path = rpc_method headers = HEADERS.merge( - "grpc-accept-encoding" => ["identity", *@options.encodings.keys] + "grpc-accept-encoding" => ["identity", *@options.supported_compression_formats] ) unless deadline == Float::INFINITY # convert to milliseconds deadline = (deadline * 1000.0).to_i headers["grpc-timeout"] = "#{deadline}m" end headers = headers.merge(metadata) if metadata # prepare compressor - deflater = nil compression = @options.grpc_compression == true ? "gzip" : @options.grpc_compression - if compression - headers["grpc-encoding"] = compression - deflater = @options.encodings[compression].deflater if @options.encodings.key?(compression) - end + headers["grpc-encoding"] = compression if compression headers.merge!(@options.call_credentials.call) if @options.call_credentials - body = if input.respond_to?(:each) - Enumerator.new do |y| - input.each do |message| - y << Message.encode(message, deflater: deflater) - end - end - else - Message.encode(input, deflater: deflater) - end - - build_request("POST", uri, headers: headers, body: body) + build_request("POST", uri, headers: headers, body: input) end end end register_plugin :grpc, GRPC end