lib/grpc/generic/bidi_call.rb in grpc-0.9.4 vs lib/grpc/generic/bidi_call.rb in grpc-0.10.0

- old
+ new

@@ -64,27 +64,28 @@ end @call = call @cq = q @deadline = deadline @marshal = marshal + @op_notifier = nil # signals completion on clients @readq = Queue.new @unmarshal = unmarshal end # Begins orchestration of the Bidi stream for a client sending requests. # # The method either returns an Enumerator of the responses, or accepts a # block that can be invoked with each response. # # @param requests the Enumerable of requests to send + # @op_notifier a Notifier used to signal completion # @return an Enumerator of requests to yield - def run_on_client(requests, &blk) - @enq_th = start_write_loop(requests) + def run_on_client(requests, op_notifier, &blk) + @op_notifier = op_notifier + @enq_th = Thread.new { write_loop(requests) } @loop_th = start_read_loop - replies = each_queued_msg - return replies if blk.nil? - replies.each { |r| blk.call(r) } + each_queued_msg(&blk) end # Begins orchestration of the Bidi stream for a server generating replies. # # N.B. gen_each_reply is a func(Enumerable<Requests>) @@ -96,98 +97,105 @@ # produced by gen_each_reply could ignore the received_msgs # # @param gen_each_reply [Proc] generates the BiDi stream replies. def run_on_server(gen_each_reply) replys = gen_each_reply.call(each_queued_msg) - @enq_th = start_write_loop(replys, is_client: false) @loop_th = start_read_loop + write_loop(replys, is_client: false) end private END_OF_READS = :end_of_reads END_OF_WRITES = :end_of_writes + # signals that bidi operation is complete + def notify_done + return unless @op_notifier + GRPC.logger.debug("bidi-notify-done: notifying #{@op_notifier}") + @op_notifier.notify(self) + end + # each_queued_msg yields each message on this instances readq # # - messages are added to the readq by #read_loop # - iteration ends when the instance itself is added def each_queued_msg return enum_for(:each_queued_msg) unless block_given? count = 0 loop do - GRPC.logger.debug("each_queued_msg: msg##{count}") + GRPC.logger.debug("each_queued_msg: waiting##{count}") count += 1 req = @readq.pop GRPC.logger.debug("each_queued_msg: req = #{req}") throw req if req.is_a? StandardError break if req.equal?(END_OF_READS) yield req end - @enq_th.join if @enq_th.alive? end - # during bidi-streaming, read the requests to send from a separate thread - # read so that read_loop does not block waiting for requests to read. - def start_write_loop(requests, is_client: true) - Thread.new do # TODO: run on a thread pool - write_tag = Object.new - begin - count = 0 - requests.each do |req| - GRPC.logger.debug("bidi-write_loop: #{count}") - count += 1 - payload = @marshal.call(req) - @call.run_batch(@cq, write_tag, INFINITE_FUTURE, - SEND_MESSAGE => payload) - end - if is_client - GRPC.logger.debug("bidi-write-loop: sent #{count}, waiting") - batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE, - SEND_CLOSE_FROM_CLIENT => nil, - RECV_STATUS_ON_CLIENT => nil) - batch_result.check_status - end - rescue StandardError => e - GRPC.logger.warn('bidi-write_loop: failed') - GRPC.logger.warn(e) - raise e - end + def write_loop(requests, is_client: true) + GRPC.logger.debug('bidi-write-loop: starting') + write_tag = Object.new + count = 0 + requests.each do |req| + GRPC.logger.debug("bidi-write-loop: #{count}") + count += 1 + payload = @marshal.call(req) + @call.run_batch(@cq, write_tag, INFINITE_FUTURE, + SEND_MESSAGE => payload) end + GRPC.logger.debug("bidi-write-loop: #{count} writes done") + if is_client + GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting") + batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE, + SEND_CLOSE_FROM_CLIENT => nil, + RECV_STATUS_ON_CLIENT => nil) + @call.status = batch_result.status + batch_result.check_status + GRPC.logger.debug("bidi-write-loop: done status #{@call.status}") + notify_done + end + GRPC.logger.debug('bidi-write-loop: finished') + rescue StandardError => e + GRPC.logger.warn('bidi-write-loop: failed') + GRPC.logger.warn(e) + notify_done + raise e end # starts the read loop def start_read_loop Thread.new do + GRPC.logger.debug('bidi-read-loop: starting') begin read_tag = Object.new count = 0 - # queue the initial read before beginning the loop loop do - GRPC.logger.debug("bidi-read_loop: #{count}") + GRPC.logger.debug("bidi-read-loop: #{count}") count += 1 # TODO: ensure metadata is read if available, currently it's not batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE, RECV_MESSAGE => nil) # handle the next message if batch_result.message.nil? + GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}") @readq.push(END_OF_READS) GRPC.logger.debug('bidi-read-loop: done reading!') break end # push the latest read onto the queue and continue reading - GRPC.logger.debug("received req: #{batch_result.message}") res = @unmarshal.call(batch_result.message) @readq.push(res) end - rescue StandardError => e - GRPC.logger.warn('bidi: read_loop failed') + GRPC.logger.warn('bidi: read-loop failed') GRPC.logger.warn(e) @readq.push(e) # let each_queued_msg terminate with this error end + GRPC.logger.debug('bidi-read-loop: finished') end end end end