lib/google/cloud/pubsub/subscriber.rb in google-cloud-pubsub-0.33.2 vs lib/google/cloud/pubsub/subscriber.rb in google-cloud-pubsub-0.34.0
- old
+ new
@@ -13,23 +13,24 @@
# limitations under the License.
require "google/cloud/pubsub/service"
require "google/cloud/pubsub/subscriber/stream"
+require "google/cloud/pubsub/subscriber/timed_unary_buffer"
require "monitor"
module Google
module Cloud
- module Pubsub
+ module PubSub
##
# Subscriber object used to stream and process messages from a
- # Subscription. See {Google::Cloud::Pubsub::Subscription#listen}
+ # Subscription. See {Google::Cloud::PubSub::Subscription#listen}
#
# @example
# require "google/cloud/pubsub"
#
- # pubsub = Google::Cloud::Pubsub.new
+ # pubsub = Google::Cloud::PubSub.new
#
# sub = pubsub.subscription "my-topic-sub"
#
# subscriber = sub.listen do |received_message|
# # process message
@@ -66,11 +67,12 @@
attr_reader :subscription_name, :callback, :deadline, :streams,
:inventory, :callback_threads, :push_threads
##
# @private Implementation attributes.
- attr_reader :stream_inventory, :stream_pool, :thread_pool, :service
+ attr_reader :stream_inventory, :stream_pool, :thread_pool, :buffer,
+ :service
##
# @private Create an empty {Subscriber} object.
def initialize subscription_name, callback, deadline: nil, streams: nil,
inventory: nil, threads: {}, service: nil
@@ -92,10 +94,12 @@
stream_pool = Array.new(@streams) do
Thread.new { Stream.new self }
end
@stream_pool = stream_pool.map(&:value)
+ @buffer = TimedUnaryBuffer.new self
+
super() # to init MonitorMixin
end
##
# Starts the subscriber pulling from the subscription and processing the
@@ -105,10 +109,12 @@
def start
start_pool = synchronize do
@started = true
@stopped = false
+ # Start the buffer before the streams are all started
+ @buffer.start
@stream_pool.map do |stream|
Thread.new { stream.start }
end
end
start_pool.map(&:join)
@@ -131,10 +137,12 @@
@stream_pool.map do |stream|
Thread.new { stream.stop }
end
end
stop_pool.map(&:join)
+ # Stop the buffer after the streams are all stopped
+ synchronize { @buffer.stop }
self
end
##
@@ -183,11 +191,11 @@
# @yieldparam [Exception] error The error raised.
#
# @example
# require "google/cloud/pubsub"
#
- # pubsub = Google::Cloud::Pubsub.new
+ # pubsub = Google::Cloud::PubSub.new
#
# sub = pubsub.subscription "my-topic-sub"
#
# subscriber = sub.listen do |received_message|
# # process message
@@ -222,11 +230,11 @@
# @return [Exception, nil] error The most recent error raised.
#
# @example
# require "google/cloud/pubsub"
#
- # pubsub = Google::Cloud::Pubsub.new
+ # pubsub = Google::Cloud::PubSub.new
#
# sub = pubsub.subscription "my-topic-sub"
#
# subscriber = sub.listen do |received_message|
# # process message
@@ -250,10 +258,11 @@
def error! error
error_callbacks = synchronize do
@last_error = error
@error_callbacks
end
+ error_callbacks = default_error_callbacks if error_callbacks.empty?
error_callbacks.each { |error_callback| error_callback.call error }
end
##
# @private
@@ -265,9 +274,26 @@
##
# @private
def inspect
"#<#{self.class.name} #{self}>"
end
+
+ protected
+
+ def default_error_callbacks
+ # This is memoized to reduce calls to the configuration.
+ @default_error_callbacks ||= begin
+ error_callback = Google::Cloud::PubSub.configuration.on_error
+ error_callback ||= Google::Cloud.configure.on_error
+ if error_callback
+ [error_callback]
+ else
+ []
+ end
+ end
+ end
end
end
+
+ Pubsub = PubSub unless const_defined? :Pubsub
end
end