lib/grpc_kit/streams/client.rb in grpc_kit-0.1.2 vs lib/grpc_kit/streams/client.rb in grpc_kit-0.1.3
- old
+ new
@@ -1,33 +1,37 @@
# frozen_string_literal: true
require 'grpc_kit/stream'
require 'grpc_kit/streams/send_buffer'
+require 'grpc_kit/status_codes'
module GrpcKit
module Streams
class Client
- def initialize(path:, protobuf:, session:, authority:)
- @path = path
+ def initialize(session:, config:, authority:)
+ @config = config
@session = session
- @protobuf = protobuf
@stream = nil
@authority = authority
end
def send_msg(data, metadata: {}, timeout: nil, last: false)
if @stream
+ # unless metadata.empty?
+ # raise 'You can attach metadata at first send_msg' # XXX
+ # end
+
unless @stream.end_write?
@session.resume_data(@stream.stream_id)
end
else
headers = build_headers(metadata: metadata, timeout: timeout)
stream = @session.start_request(GrpcKit::Streams::SendBuffer.new, headers)
- @stream = GrpcKit::Stream.new(protobuf: @protobuf, session: @session, stream: stream)
+ @stream = GrpcKit::Stream.new(protobuf: @config.protobuf, session: @session, stream: stream)
end
- @stream.send(data, last: last)
+ @stream.send(data, last: last, limit_size: @config.max_send_message_size)
@session.run_once
end
def each(&block)
unless @stream
@@ -40,11 +44,18 @@
def recv(last: false)
unless @stream
raise 'You should call `send` method to send data'
end
- @stream.recv(last: last)
+ data = @stream.recv(last: last, limit_size: @config.max_receive_message_size)
+
+ if data.nil?
+ check_status!
+ raise StopIteration
+ end
+
+ data
end
def close_and_recv
unless @stream
raise 'You should call `send` method to send data'
@@ -56,21 +67,36 @@
@stream.end_write
@session.start(@stream.stream_id)
@stream.end_read
+ check_status!
+
data = []
@stream.each { |d| data.push(d) }
data
end
private
+ def check_status!
+ # XXX: wait until half close (remote) to get grpc-status
+ until @stream.close_remote?
+ @session.run_once
+ end
+
+ if @stream.headers.grpc_status != GrpcKit::StatusCodes::OK
+ raise GrpcKit::Errors.from_status_code(@stream.headers.grpc_status, @stream.headers.status_message)
+ else
+ GrpcKit.logger.debug('request is success')
+ end
+ end
+
def build_headers(metadata: {}, timeout: nil, **headers)
hdrs = metadata.merge(headers).merge(
':method' => 'POST',
':scheme' => 'http',
- ':path' => @path,
+ ':path' => @config.path,
':authority' => @authority,
'te' => 'trailers',
'content-type' => 'application/grpc',
'user-agent' => "grpc-ruby/#{GrpcKit::VERSION} (grpc_kit)",
'grpc-accept-encoding' => 'identity,deflate,gzip',