lib/grpc/generic/bidi_call.rb in grpc-0.10.0 vs lib/grpc/generic/bidi_call.rb in grpc-0.11.0
- old
+ new
@@ -54,19 +54,17 @@
# @param call [Call] the call used by the ActiveCall
# @param q [CompletionQueue] the completion queue used to accept
# the call
# @param marshal [Function] f(obj)->string that marshal requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
- # @param deadline [Fixnum] the deadline for the call to complete
- def initialize(call, q, marshal, unmarshal, deadline)
+ def initialize(call, q, marshal, unmarshal)
fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
unless q.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue')
end
@call = call
@cq = q
- @deadline = deadline
@marshal = marshal
@op_notifier = nil # signals completion on clients
@readq = Queue.new
@unmarshal = unmarshal
end
@@ -97,11 +95,11 @@
# produced by gen_each_reply could ignore the received_msgs
#
# @param gen_each_reply [Proc] generates the BiDi stream replies.
def run_on_server(gen_each_reply)
replys = gen_each_reply.call(each_queued_msg)
- @loop_th = start_read_loop
+ @loop_th = start_read_loop(is_client: false)
write_loop(replys, is_client: false)
end
private
@@ -125,11 +123,11 @@
loop do
GRPC.logger.debug("each_queued_msg: waiting##{count}")
count += 1
req = @readq.pop
GRPC.logger.debug("each_queued_msg: req = #{req}")
- throw req if req.is_a? StandardError
+ fail req if req.is_a? StandardError
break if req.equal?(END_OF_READS)
yield req
end
end
@@ -145,16 +143,13 @@
SEND_MESSAGE => payload)
end
GRPC.logger.debug("bidi-write-loop: #{count} writes done")
if is_client
GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting")
- batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
- SEND_CLOSE_FROM_CLIENT => nil,
- RECV_STATUS_ON_CLIENT => nil)
- @call.status = batch_result.status
- batch_result.check_status
- GRPC.logger.debug("bidi-write-loop: done status #{@call.status}")
+ @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
+ SEND_CLOSE_FROM_CLIENT => nil)
+ GRPC.logger.debug('bidi-write-loop: done')
notify_done
end
GRPC.logger.debug('bidi-write-loop: finished')
rescue StandardError => e
GRPC.logger.warn('bidi-write-loop: failed')
@@ -162,11 +157,11 @@
notify_done
raise e
end
# starts the read loop
- def start_read_loop
+ def start_read_loop(is_client: true)
Thread.new do
GRPC.logger.debug('bidi-read-loop: starting')
begin
read_tag = Object.new
count = 0
@@ -175,12 +170,22 @@
GRPC.logger.debug("bidi-read-loop: #{count}")
count += 1
# TODO: ensure metadata is read if available, currently it's not
batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE,
RECV_MESSAGE => nil)
+
# handle the next message
if batch_result.message.nil?
GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}")
+
+ if is_client
+ batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE,
+ RECV_STATUS_ON_CLIENT => nil)
+ @call.status = batch_result.status
+ batch_result.check_status
+ GRPC.logger.debug("bidi-read-loop: done status #{@call.status}")
+ end
+
@readq.push(END_OF_READS)
GRPC.logger.debug('bidi-read-loop: done reading!')
break
end