lib/grpc/generic/bidi_call.rb in grpc-0.9.3 vs lib/grpc/generic/bidi_call.rb in grpc-0.9.4
- old
+ new
@@ -76,13 +76,15 @@
# block that can be invoked with each response.
#
# @param requests the Enumerable of requests to send
# @return an Enumerator of requests to yield
def run_on_client(requests, &blk)
- @enq_th = Thread.new { write_loop(requests) }
+ @enq_th = start_write_loop(requests)
@loop_th = start_read_loop
- each_queued_msg(&blk)
+ replies = each_queued_msg
+ return replies if blk.nil?
+ replies.each { |r| blk.call(r) }
end
# Begins orchestration of the Bidi stream for a server generating replies.
#
# N.B. gen_each_reply is a func(Enumerable<Requests>)
@@ -94,12 +96,12 @@
# produced by gen_each_reply could ignore the received_msgs
#
# @param gen_each_reply [Proc] generates the BiDi stream replies.
def run_on_server(gen_each_reply)
replys = gen_each_reply.call(each_queued_msg)
+ @enq_th = start_write_loop(replys, is_client: false)
@loop_th = start_read_loop
- write_loop(replys, is_client: false)
end
private
END_OF_READS = :end_of_reads
@@ -111,79 +113,81 @@
# - iteration ends when the instance itself is added
def each_queued_msg
return enum_for(:each_queued_msg) unless block_given?
count = 0
loop do
- GRPC.logger.debug("each_queued_msg: waiting##{count}")
+ GRPC.logger.debug("each_queued_msg: msg##{count}")
count += 1
req = @readq.pop
GRPC.logger.debug("each_queued_msg: req = #{req}")
throw req if req.is_a? StandardError
break if req.equal?(END_OF_READS)
yield req
end
+ @enq_th.join if @enq_th.alive?
end
- def write_loop(requests, is_client: true)
- GRPC.logger.debug('bidi-write-loop: starting')
- write_tag = Object.new
- count = 0
- requests.each do |req|
- GRPC.logger.debug("bidi-write-loop: #{count}")
- count += 1
- payload = @marshal.call(req)
- @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
- SEND_MESSAGE => payload)
+ # during bidi-streaming, read the requests to send from a separate thread
+ # read so that read_loop does not block waiting for requests to read.
+ def start_write_loop(requests, is_client: true)
+ Thread.new do # TODO: run on a thread pool
+ write_tag = Object.new
+ begin
+ count = 0
+ requests.each do |req|
+ GRPC.logger.debug("bidi-write_loop: #{count}")
+ count += 1
+ payload = @marshal.call(req)
+ @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
+ SEND_MESSAGE => payload)
+ end
+ if is_client
+ GRPC.logger.debug("bidi-write-loop: sent #{count}, waiting")
+ batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
+ SEND_CLOSE_FROM_CLIENT => nil,
+ RECV_STATUS_ON_CLIENT => nil)
+ batch_result.check_status
+ end
+ rescue StandardError => e
+ GRPC.logger.warn('bidi-write_loop: failed')
+ GRPC.logger.warn(e)
+ raise e
+ end
end
- GRPC.logger.debug("bidi-write-loop: #{count} writes done")
- if is_client
- GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting")
- batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
- SEND_CLOSE_FROM_CLIENT => nil,
- RECV_STATUS_ON_CLIENT => nil)
- @call.status = batch_result.status
- batch_result.check_status
- GRPC.logger.debug("bidi-write-loop: done status #{@call.status}")
- end
- GRPC.logger.debug('bidi-write-loop: finished')
- rescue StandardError => e
- GRPC.logger.warn('bidi-write-loop: failed')
- GRPC.logger.warn(e)
- raise e
end
# starts the read loop
def start_read_loop
Thread.new do
- GRPC.logger.debug('bidi-read-loop: starting')
begin
read_tag = Object.new
count = 0
+
# queue the initial read before beginning the loop
loop do
- GRPC.logger.debug("bidi-read-loop: #{count}")
+ GRPC.logger.debug("bidi-read_loop: #{count}")
count += 1
# TODO: ensure metadata is read if available, currently it's not
batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE,
RECV_MESSAGE => nil)
# handle the next message
if batch_result.message.nil?
- GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}")
@readq.push(END_OF_READS)
GRPC.logger.debug('bidi-read-loop: done reading!')
break
end
# push the latest read onto the queue and continue reading
+ GRPC.logger.debug("received req: #{batch_result.message}")
res = @unmarshal.call(batch_result.message)
@readq.push(res)
end
+
rescue StandardError => e
- GRPC.logger.warn('bidi: read-loop failed')
+ GRPC.logger.warn('bidi: read_loop failed')
GRPC.logger.warn(e)
@readq.push(e) # let each_queued_msg terminate with this error
end
- GRPC.logger.debug('bidi-read-loop: finished')
end
end
end
end