lib/grpc/generic/bidi_call.rb in grpc-0.10.0 vs lib/grpc/generic/bidi_call.rb in grpc-0.11.0

- old
+ new

@@ -54,19 +54,17 @@ # @param call [Call] the call used by the ActiveCall # @param q [CompletionQueue] the completion queue used to accept # the call # @param marshal [Function] f(obj)->string that marshal requests # @param unmarshal [Function] f(string)->obj that unmarshals responses - # @param deadline [Fixnum] the deadline for the call to complete - def initialize(call, q, marshal, unmarshal, deadline) + def initialize(call, q, marshal, unmarshal) fail(ArgumentError, 'not a call') unless call.is_a? Core::Call unless q.is_a? Core::CompletionQueue fail(ArgumentError, 'not a CompletionQueue') end @call = call @cq = q - @deadline = deadline @marshal = marshal @op_notifier = nil # signals completion on clients @readq = Queue.new @unmarshal = unmarshal end @@ -97,11 +95,11 @@ # produced by gen_each_reply could ignore the received_msgs # # @param gen_each_reply [Proc] generates the BiDi stream replies. def run_on_server(gen_each_reply) replys = gen_each_reply.call(each_queued_msg) - @loop_th = start_read_loop + @loop_th = start_read_loop(is_client: false) write_loop(replys, is_client: false) end private @@ -125,11 +123,11 @@ loop do GRPC.logger.debug("each_queued_msg: waiting##{count}") count += 1 req = @readq.pop GRPC.logger.debug("each_queued_msg: req = #{req}") - throw req if req.is_a? StandardError + fail req if req.is_a? StandardError break if req.equal?(END_OF_READS) yield req end end @@ -145,16 +143,13 @@ SEND_MESSAGE => payload) end GRPC.logger.debug("bidi-write-loop: #{count} writes done") if is_client GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting") - batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE, - SEND_CLOSE_FROM_CLIENT => nil, - RECV_STATUS_ON_CLIENT => nil) - @call.status = batch_result.status - batch_result.check_status - GRPC.logger.debug("bidi-write-loop: done status #{@call.status}") + @call.run_batch(@cq, write_tag, INFINITE_FUTURE, + SEND_CLOSE_FROM_CLIENT => nil) + GRPC.logger.debug('bidi-write-loop: done') notify_done end GRPC.logger.debug('bidi-write-loop: finished') rescue StandardError => e GRPC.logger.warn('bidi-write-loop: failed') @@ -162,11 +157,11 @@ notify_done raise e end # starts the read loop - def start_read_loop + def start_read_loop(is_client: true) Thread.new do GRPC.logger.debug('bidi-read-loop: starting') begin read_tag = Object.new count = 0 @@ -175,12 +170,22 @@ GRPC.logger.debug("bidi-read-loop: #{count}") count += 1 # TODO: ensure metadata is read if available, currently it's not batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE, RECV_MESSAGE => nil) + # handle the next message if batch_result.message.nil? GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}") + + if is_client + batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE, + RECV_STATUS_ON_CLIENT => nil) + @call.status = batch_result.status + batch_result.check_status + GRPC.logger.debug("bidi-read-loop: done status #{@call.status}") + end + @readq.push(END_OF_READS) GRPC.logger.debug('bidi-read-loop: done reading!') break end