lib/grpc_kit/streams/client.rb in grpc_kit-0.1.2 vs lib/grpc_kit/streams/client.rb in grpc_kit-0.1.3

- old
+ new

@@ -1,33 +1,37 @@ # frozen_string_literal: true require 'grpc_kit/stream' require 'grpc_kit/streams/send_buffer' +require 'grpc_kit/status_codes' module GrpcKit module Streams class Client - def initialize(path:, protobuf:, session:, authority:) - @path = path + def initialize(session:, config:, authority:) + @config = config @session = session - @protobuf = protobuf @stream = nil @authority = authority end def send_msg(data, metadata: {}, timeout: nil, last: false) if @stream + # unless metadata.empty? + # raise 'You can attach metadata at first send_msg' # XXX + # end + unless @stream.end_write? @session.resume_data(@stream.stream_id) end else headers = build_headers(metadata: metadata, timeout: timeout) stream = @session.start_request(GrpcKit::Streams::SendBuffer.new, headers) - @stream = GrpcKit::Stream.new(protobuf: @protobuf, session: @session, stream: stream) + @stream = GrpcKit::Stream.new(protobuf: @config.protobuf, session: @session, stream: stream) end - @stream.send(data, last: last) + @stream.send(data, last: last, limit_size: @config.max_send_message_size) @session.run_once end def each(&block) unless @stream @@ -40,11 +44,18 @@ def recv(last: false) unless @stream raise 'You should call `send` method to send data' end - @stream.recv(last: last) + data = @stream.recv(last: last, limit_size: @config.max_receive_message_size) + + if data.nil? + check_status! + raise StopIteration + end + + data end def close_and_recv unless @stream raise 'You should call `send` method to send data' @@ -56,21 +67,36 @@ @stream.end_write @session.start(@stream.stream_id) @stream.end_read + check_status! + data = [] @stream.each { |d| data.push(d) } data end private + def check_status! + # XXX: wait until half close (remote) to get grpc-status + until @stream.close_remote? + @session.run_once + end + + if @stream.headers.grpc_status != GrpcKit::StatusCodes::OK + raise GrpcKit::Errors.from_status_code(@stream.headers.grpc_status, @stream.headers.status_message) + else + GrpcKit.logger.debug('request is success') + end + end + def build_headers(metadata: {}, timeout: nil, **headers) hdrs = metadata.merge(headers).merge( ':method' => 'POST', ':scheme' => 'http', - ':path' => @path, + ':path' => @config.path, ':authority' => @authority, 'te' => 'trailers', 'content-type' => 'application/grpc', 'user-agent' => "grpc-ruby/#{GrpcKit::VERSION} (grpc_kit)", 'grpc-accept-encoding' => 'identity,deflate,gzip',