lib/grpc/generic/bidi_call.rb in grpc-0.9.3 vs lib/grpc/generic/bidi_call.rb in grpc-0.9.4

- old
+ new

@@ -76,13 +76,15 @@ # block that can be invoked with each response. # # @param requests the Enumerable of requests to send # @return an Enumerator of requests to yield def run_on_client(requests, &blk) - @enq_th = Thread.new { write_loop(requests) } + @enq_th = start_write_loop(requests) @loop_th = start_read_loop - each_queued_msg(&blk) + replies = each_queued_msg + return replies if blk.nil? + replies.each { |r| blk.call(r) } end # Begins orchestration of the Bidi stream for a server generating replies. # # N.B. gen_each_reply is a func(Enumerable<Requests>) @@ -94,12 +96,12 @@ # produced by gen_each_reply could ignore the received_msgs # # @param gen_each_reply [Proc] generates the BiDi stream replies. def run_on_server(gen_each_reply) replys = gen_each_reply.call(each_queued_msg) + @enq_th = start_write_loop(replys, is_client: false) @loop_th = start_read_loop - write_loop(replys, is_client: false) end private END_OF_READS = :end_of_reads @@ -111,79 +113,81 @@ # - iteration ends when the instance itself is added def each_queued_msg return enum_for(:each_queued_msg) unless block_given? count = 0 loop do - GRPC.logger.debug("each_queued_msg: waiting##{count}") + GRPC.logger.debug("each_queued_msg: msg##{count}") count += 1 req = @readq.pop GRPC.logger.debug("each_queued_msg: req = #{req}") throw req if req.is_a? StandardError break if req.equal?(END_OF_READS) yield req end + @enq_th.join if @enq_th.alive? end - def write_loop(requests, is_client: true) - GRPC.logger.debug('bidi-write-loop: starting') - write_tag = Object.new - count = 0 - requests.each do |req| - GRPC.logger.debug("bidi-write-loop: #{count}") - count += 1 - payload = @marshal.call(req) - @call.run_batch(@cq, write_tag, INFINITE_FUTURE, - SEND_MESSAGE => payload) + # during bidi-streaming, read the requests to send from a separate thread + # read so that read_loop does not block waiting for requests to read. + def start_write_loop(requests, is_client: true) + Thread.new do # TODO: run on a thread pool + write_tag = Object.new + begin + count = 0 + requests.each do |req| + GRPC.logger.debug("bidi-write_loop: #{count}") + count += 1 + payload = @marshal.call(req) + @call.run_batch(@cq, write_tag, INFINITE_FUTURE, + SEND_MESSAGE => payload) + end + if is_client + GRPC.logger.debug("bidi-write-loop: sent #{count}, waiting") + batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE, + SEND_CLOSE_FROM_CLIENT => nil, + RECV_STATUS_ON_CLIENT => nil) + batch_result.check_status + end + rescue StandardError => e + GRPC.logger.warn('bidi-write_loop: failed') + GRPC.logger.warn(e) + raise e + end end - GRPC.logger.debug("bidi-write-loop: #{count} writes done") - if is_client - GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting") - batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE, - SEND_CLOSE_FROM_CLIENT => nil, - RECV_STATUS_ON_CLIENT => nil) - @call.status = batch_result.status - batch_result.check_status - GRPC.logger.debug("bidi-write-loop: done status #{@call.status}") - end - GRPC.logger.debug('bidi-write-loop: finished') - rescue StandardError => e - GRPC.logger.warn('bidi-write-loop: failed') - GRPC.logger.warn(e) - raise e end # starts the read loop def start_read_loop Thread.new do - GRPC.logger.debug('bidi-read-loop: starting') begin read_tag = Object.new count = 0 + # queue the initial read before beginning the loop loop do - GRPC.logger.debug("bidi-read-loop: #{count}") + GRPC.logger.debug("bidi-read_loop: #{count}") count += 1 # TODO: ensure metadata is read if available, currently it's not batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE, RECV_MESSAGE => nil) # handle the next message if batch_result.message.nil? - GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}") @readq.push(END_OF_READS) GRPC.logger.debug('bidi-read-loop: done reading!') break end # push the latest read onto the queue and continue reading + GRPC.logger.debug("received req: #{batch_result.message}") res = @unmarshal.call(batch_result.message) @readq.push(res) end + rescue StandardError => e - GRPC.logger.warn('bidi: read-loop failed') + GRPC.logger.warn('bidi: read_loop failed') GRPC.logger.warn(e) @readq.push(e) # let each_queued_msg terminate with this error end - GRPC.logger.debug('bidi-read-loop: finished') end end end end