lib/httpx/plugins/grpc.rb in httpx-0.24.7 vs lib/httpx/plugins/grpc.rb in httpx-1.0.0
- old
+ new
@@ -47,24 +47,23 @@
}.freeze
class << self
def load_dependencies(*)
require "stringio"
+ require "httpx/plugins/grpc/grpc_encoding"
require "httpx/plugins/grpc/message"
require "httpx/plugins/grpc/call"
end
def configure(klass)
klass.plugin(:persistent)
- klass.plugin(:compression)
klass.plugin(:stream)
end
def extra_options(options)
options.merge(
fallback_protocol: "h2",
- http2_settings: { wait_for_handshake: false },
grpc_rpcs: {}.freeze,
grpc_compression: false,
grpc_deadline: DEADLINE
)
end
@@ -106,13 +105,22 @@
def merge_headers(trailers)
@trailing_metadata = Hash[trailers]
super
end
+ end
- def encoders
- @options.encodings
+ module RequestBodyMethods
+ def initialize(headers, _)
+ super
+
+ if (compression = headers["grpc-encoding"])
+ deflater_body = self.class.initialize_deflater_body(@body, compression)
+ @body = Transcoder::GRPCEncoding.encode(deflater_body || @body, compressed: !deflater_body.nil?)
+ else
+ @body = Transcoder::GRPCEncoding.encode(@body, compressed: false)
+ end
end
end
module InstanceMethods
def with_channel_credentials(ca_path, key = nil, cert = nil, **ssl_opts)
@@ -139,33 +147,21 @@
rpc_opts = {
deadline: @options.grpc_deadline,
}.merge(opts)
- local_rpc_name = rpc_name.underscore
-
session_class = Class.new(self.class) do
- # define rpc method with ruby style name
class_eval(<<-OUT, __FILE__, __LINE__ + 1)
- def #{local_rpc_name}(input, **opts) # def grpc_action(input, **opts)
- rpc_execute("#{local_rpc_name}", input, **opts) # rpc_execute("grpc_action", input, **opts)
- end # end
+ def #{rpc_name}(input, **opts) # def grpc_action(input, **opts)
+ rpc_execute("#{rpc_name}", input, **opts) # rpc_execute("grpc_action", input, **opts)
+ end # end
OUT
-
- # define rpc method with original name
- unless local_rpc_name == rpc_name
- class_eval(<<-OUT, __FILE__, __LINE__ + 1)
- def #{rpc_name}(input, **opts) # def grpcAction(input, **opts)
- rpc_execute("#{local_rpc_name}", input, **opts) # rpc_execute("grpc_action", input, **opts)
- end # end
- OUT
- end
end
session_class.new(@options.merge(
grpc_rpcs: @options.grpc_rpcs.merge(
- local_rpc_name => [rpc_name, input, output, rpc_opts]
+ rpc_name.underscore => [rpc_name, input, output, rpc_opts]
).freeze
))
end
def build_stub(origin, service: nil, compression: false)
@@ -243,41 +239,27 @@
rpc_method = "/#{rpc_method}" unless rpc_method.start_with?("/")
rpc_method = "/#{@options.grpc_service}#{rpc_method}" if @options.grpc_service
uri.path = rpc_method
headers = HEADERS.merge(
- "grpc-accept-encoding" => ["identity", *@options.encodings.keys]
+ "grpc-accept-encoding" => ["identity", *@options.supported_compression_formats]
)
unless deadline == Float::INFINITY
# convert to milliseconds
deadline = (deadline * 1000.0).to_i
headers["grpc-timeout"] = "#{deadline}m"
end
headers = headers.merge(metadata) if metadata
# prepare compressor
- deflater = nil
compression = @options.grpc_compression == true ? "gzip" : @options.grpc_compression
- if compression
- headers["grpc-encoding"] = compression
- deflater = @options.encodings[compression].deflater if @options.encodings.key?(compression)
- end
+ headers["grpc-encoding"] = compression if compression
headers.merge!(@options.call_credentials.call) if @options.call_credentials
- body = if input.respond_to?(:each)
- Enumerator.new do |y|
- input.each do |message|
- y << Message.encode(message, deflater: deflater)
- end
- end
- else
- Message.encode(input, deflater: deflater)
- end
-
- build_request("POST", uri, headers: headers, body: body)
+ build_request("POST", uri, headers: headers, body: input)
end
end
end
register_plugin :grpc, GRPC
end