lib/grpc_kit/streams/server.rb in grpc_kit-0.1.2 vs lib/grpc_kit/streams/server.rb in grpc_kit-0.1.3

- old
+ new

@@ -1,35 +1,53 @@ # frozen_string_literal: true require 'forwardable' require 'grpc_kit/stream' +require 'grpc_kit/status_codes' module GrpcKit module Streams class Server extend Forwardable delegate %i[each recv] => :@stream - def initialize(protobuf:, session:, stream:) - @protobuf = protobuf - @session = session - @stream = GrpcKit::Stream.new(protobuf: @protobuf, session: @session, stream: stream) + def initialize(stream:, session:, config:) + @stream = GrpcKit::Stream.new(protobuf: config.protobuf, session: session, stream: stream) + @config = config @sent_first_msg = false end def send_msg(data, last: false) - @stream.send(data, last: last) + if last + @stream.send_trailer # TODO: pass trailer metadata + end + @stream.send(data, last: last, limit_size: @config.max_send_message_size) return if @sent_first_msg - @session.submit_response( - @stream.stream_id, - ':status' => '200', - 'content-type' => 'application/grpc', - 'accept-encoding' => 'identity', - ) + @stream.submit_response + @sent_first_msg = true + end + + def recv(last: false) + data = @stream.recv(last: last, limit_size: @config.max_receive_message_size) + raise StopIteration if data.nil? + + data + end + + def send_trailer + @stream.send_trailer # TODO: pass trailer metadata + @stream.end_write + end + + def send_status(status: GrpcKit::StatusCodes::INTERNAL, msg: nil, metadata: {}) + @stream.send_trailer(status: status, msg: msg, metadata: metadata) + return if @sent_first_msg + + @stream.submit_response(piggyback_trailer: true) @sent_first_msg = true end end end end