lib/google/cloud/pubsub/subscriber.rb in google-cloud-pubsub-0.33.2 vs lib/google/cloud/pubsub/subscriber.rb in google-cloud-pubsub-0.34.0

- old
+ new

@@ -13,23 +13,24 @@ # limitations under the License. require "google/cloud/pubsub/service" require "google/cloud/pubsub/subscriber/stream" +require "google/cloud/pubsub/subscriber/timed_unary_buffer" require "monitor" module Google module Cloud - module Pubsub + module PubSub ## # Subscriber object used to stream and process messages from a - # Subscription. See {Google::Cloud::Pubsub::Subscription#listen} + # Subscription. See {Google::Cloud::PubSub::Subscription#listen} # # @example # require "google/cloud/pubsub" # - # pubsub = Google::Cloud::Pubsub.new + # pubsub = Google::Cloud::PubSub.new # # sub = pubsub.subscription "my-topic-sub" # # subscriber = sub.listen do |received_message| # # process message @@ -66,11 +67,12 @@ attr_reader :subscription_name, :callback, :deadline, :streams, :inventory, :callback_threads, :push_threads ## # @private Implementation attributes. - attr_reader :stream_inventory, :stream_pool, :thread_pool, :service + attr_reader :stream_inventory, :stream_pool, :thread_pool, :buffer, + :service ## # @private Create an empty {Subscriber} object. def initialize subscription_name, callback, deadline: nil, streams: nil, inventory: nil, threads: {}, service: nil @@ -92,10 +94,12 @@ stream_pool = Array.new(@streams) do Thread.new { Stream.new self } end @stream_pool = stream_pool.map(&:value) + @buffer = TimedUnaryBuffer.new self + super() # to init MonitorMixin end ## # Starts the subscriber pulling from the subscription and processing the @@ -105,10 +109,12 @@ def start start_pool = synchronize do @started = true @stopped = false + # Start the buffer before the streams are all started + @buffer.start @stream_pool.map do |stream| Thread.new { stream.start } end end start_pool.map(&:join) @@ -131,10 +137,12 @@ @stream_pool.map do |stream| Thread.new { stream.stop } end end stop_pool.map(&:join) + # Stop the buffer after the streams are all stopped + synchronize { @buffer.stop } self end ## @@ -183,11 +191,11 @@ # @yieldparam [Exception] error The error raised. # # @example # require "google/cloud/pubsub" # - # pubsub = Google::Cloud::Pubsub.new + # pubsub = Google::Cloud::PubSub.new # # sub = pubsub.subscription "my-topic-sub" # # subscriber = sub.listen do |received_message| # # process message @@ -222,11 +230,11 @@ # @return [Exception, nil] error The most recent error raised. # # @example # require "google/cloud/pubsub" # - # pubsub = Google::Cloud::Pubsub.new + # pubsub = Google::Cloud::PubSub.new # # sub = pubsub.subscription "my-topic-sub" # # subscriber = sub.listen do |received_message| # # process message @@ -250,10 +258,11 @@ def error! error error_callbacks = synchronize do @last_error = error @error_callbacks end + error_callbacks = default_error_callbacks if error_callbacks.empty? error_callbacks.each { |error_callback| error_callback.call error } end ## # @private @@ -265,9 +274,26 @@ ## # @private def inspect "#<#{self.class.name} #{self}>" end + + protected + + def default_error_callbacks + # This is memoized to reduce calls to the configuration. + @default_error_callbacks ||= begin + error_callback = Google::Cloud::PubSub.configuration.on_error + error_callback ||= Google::Cloud.configure.on_error + if error_callback + [error_callback] + else + [] + end + end + end end end + + Pubsub = PubSub unless const_defined? :Pubsub end end