# Copyright 2017 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. require "google/cloud/pubsub/subscriber/async_pusher" require "google/cloud/pubsub/subscriber/enumerator_queue" require "google/cloud/pubsub/service" require "google/cloud/errors" require "monitor" require "concurrent" module Google module Cloud module Pubsub class Subscriber ## # @private class Stream include MonitorMixin ## # @private Implementation attributes. attr_reader :callback_thread_pool, :push_thread_pool ## # Subscriber attributes. attr_reader :subscriber ## # @private Create an empty Subscriber::Stream object. def initialize subscriber @subscriber = subscriber @request_queue = nil @stopped = nil @paused = nil @pause_cond = new_cond @inventory = Inventory.new self, subscriber.stream_inventory @callback_thread_pool = Concurrent::FixedThreadPool.new \ subscriber.callback_threads @push_thread_pool = Concurrent::FixedThreadPool.new \ subscriber.push_threads super() # to init MonitorMixin end def start synchronize do break if @request_queue start_streaming! end self end def stop synchronize do break if @stopped @stopped = true @inventory.stop # signal to the background thread that we are unpaused @pause_cond.broadcast end self end def stopped? synchronize { @stopped } end def paused? synchronize { @paused } end def wait! synchronize do # Now that the reception thread is dead, make sure all recieved # messages have had the callback called. @callback_thread_pool.shutdown @callback_thread_pool.wait_for_termination # Once all the callbacks are complete, we can stop the publisher # and send the final request to the steeam. if @async_pusher request = @async_pusher.stop if request Concurrent::Future.new(executor: @push_thread_pool) do @request_queue.push request end.execute end end # Close the push thread pool now that the pusher is closed. @push_thread_pool.shutdown @push_thread_pool.wait_for_termination # Close the stream now that all requests have been made. @request_queue.push self unless @request_queue.nil? end self end ## # @private def acknowledge *messages ack_ids = coerce_ack_ids messages return true if ack_ids.empty? synchronize do @async_pusher ||= AsyncPusher.new self @async_pusher.acknowledge ack_ids @inventory.remove ack_ids unpause_streaming! end true end ## # @private def delay deadline, *messages mod_ack_ids = coerce_ack_ids messages return true if mod_ack_ids.empty? synchronize do @async_pusher ||= AsyncPusher.new self @async_pusher.delay deadline, mod_ack_ids @inventory.remove mod_ack_ids unpause_streaming! end true end def async_pusher synchronize { @async_pusher } end def push request synchronize { @request_queue.push request } end def inventory synchronize { @inventory } end ## # @private def delay_inventory! synchronize do return true if @inventory.empty? @async_pusher ||= AsyncPusher.new self @async_pusher.delay subscriber.deadline, @inventory.ack_ids end true end # @private def to_s format "(inventory: %<inv>i, status: %<sts>s)", inv: inventory.count, sts: status end # @private def inspect "#<#{self.class.name} #{self}>" end protected # rubocop:disable all def background_run enum loop do synchronize do if @paused && !@stopped @pause_cond.wait next end end # Break loop, close thread if stopped break if synchronize { @stopped } begin # Cannot syncronize the enumerator, causes deadlock response = enum.next # Create a list of all the received ack_id values received_ack_ids = response.received_messages.map(&:ack_id) synchronize do # Create receipt of received messages reception @async_pusher ||= AsyncPusher.new self @async_pusher.delay subscriber.deadline, received_ack_ids # Add received messages to inventory @inventory.add received_ack_ids end response.received_messages.each do |rec_msg_grpc| rec_msg = ReceivedMessage.from_grpc(rec_msg_grpc, self) synchronize do # Call user provided code for received message perform_callback_async rec_msg end end synchronize { pause_streaming! } rescue StopIteration break end end rescue GRPC::DeadlineExceeded, GRPC::Unavailable, GRPC::Cancelled, GRPC::ResourceExhausted, GRPC::Internal # The GAPIC layer will raise DeadlineExceeded when stream is opened # longer than the timeout value it is configured for. When this # happends, restart the stream stealthly. # Also stealthly restart the stream on Unavailable, Cancelled, # ResourceExhausted, and Internal. synchronize { start_streaming! } rescue StandardError => e raise Google::Cloud::Error.from_error(e) end # rubocop:enable all def perform_callback_async rec_msg Concurrent::Future.new(executor: callback_thread_pool) do subscriber.callback.call rec_msg end.execute end def start_streaming! # Don't allow a stream to restart if already stopped return if @stopped # signal to the previous queue to shut down old_queue = [] old_queue = @request_queue.quit_and_dump_queue if @request_queue @request_queue = EnumeratorQueue.new self @request_queue.push initial_input_request old_queue.each { |obj| @request_queue.push obj } output_enum = subscriber.service.streaming_pull @request_queue.each @stopped = nil @paused = nil # create new background thread to handle new enumerator @background_thread = Thread.new(output_enum) do |enum| background_run enum end end def pause_streaming! return unless pause_streaming? @paused = true end def pause_streaming? return if @stopped return if @paused @inventory.full? end def unpause_streaming! return unless unpause_streaming? @paused = nil # signal to the background thread that we are unpaused @pause_cond.broadcast end def unpause_streaming? return if @stopped return if @paused.nil? @inventory.count < @inventory.limit * 0.8 end def initial_input_request Google::Pubsub::V1::StreamingPullRequest.new.tap do |req| req.subscription = subscriber.subscription_name req.stream_ack_deadline_seconds = subscriber.deadline req.modify_deadline_ack_ids += @inventory.ack_ids req.modify_deadline_seconds += \ @inventory.ack_ids.map { subscriber.deadline } end end ## # Makes sure the values are the `ack_id`. If given several # {ReceivedMessage} objects extract the `ack_id` values. def coerce_ack_ids messages Array(messages).flatten.map do |msg| msg.respond_to?(:ack_id) ? msg.ack_id : msg.to_s end end def status return "not started" if @background_thread.nil? status = @background_thread.status return "error" if status.nil? return "stopped" if status == false status end ## # @private class Inventory include MonitorMixin attr_reader :stream, :limit def initialize stream, limit @stream = stream @limit = limit @_ack_ids = [] @wait_cond = new_cond super() end def ack_ids @_ack_ids end def add *ack_ids ack_ids = Array(ack_ids).flatten synchronize do @_ack_ids += ack_ids unless @stopped @background_thread ||= Thread.new { background_run } end end end def remove *ack_ids ack_ids = Array(ack_ids).flatten synchronize do @_ack_ids -= ack_ids if @_ack_ids.empty? if @background_thread @background_thread.kill @background_thread = nil end end end end def count synchronize do @_ack_ids.count end end def empty? synchronize do @_ack_ids.empty? end end def stop synchronize do @stopped = true @background_thread.kill if @background_thread end end def full? count >= limit end protected def background_run until synchronize { @stopped } delay = calc_delay synchronize { @wait_cond.wait delay } stream.delay_inventory! end end def calc_delay (stream.subscriber.deadline - 3) * rand(0.8..0.9) end end end end end end end