Sha256: 4fa13272e4e1b0d1e7e3e00932a1984b5ad5fdceda7740d0305fb34a36ead370
Contents?: true
Size: 1.53 KB
Versions: 3
Compression:
Stored size: 1.53 KB
Contents
# frozen_string_literal: true require 'grpc_kit/call' require 'grpc_kit/calls' module GrpcKit module Calls::Client class BidiStreamer < GrpcKit::Call include Enumerable alias outgoing_metadata metadata def initialize(**) super @recv_mutex = Mutex.new @send = false @send_cv = Thread::ConditionVariable.new @send_mutex = Mutex.new end # @param data [Object] request message # @return [void] def send_msg(data) if @reason raise "Upstream returns an error status: #{@reason}" end @send_mutex.synchronize do @stream.send_msg(data, metadata: outgoing_metadata) @send = true @send_cv.broadcast end end # Receive a message from peer. This method is not thread safe, never call from multiple threads at once. # @return [Object] response object # @raise [StopIteration] def recv @send_mutex.synchronize { @send_cv.wait(@send_mutex) until @send } unless @send msg = @stream.recv_msg(blocking: true) return msg if msg raise StopIteration rescue GrpcKit::Errors::BadStatus => e @reason = e raise e end def close_and_send @send_mutex.synchronize do @stream.close_and_send end end # @yieldparam response [Object] each response object of bidi streaming RPC def each @recv_mutex.synchronize do loop { yield(recv) } end end end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
grpc_kit-0.5.1 | lib/grpc_kit/calls/client_bidi_streamer.rb |
grpc_kit-0.5.0 | lib/grpc_kit/calls/client_bidi_streamer.rb |
grpc_kit-0.4.0 | lib/grpc_kit/calls/client_bidi_streamer.rb |