lib/google/cloud/pubsub/subscriber.rb in google-cloud-pubsub-0.31.1 vs lib/google/cloud/pubsub/subscriber.rb in google-cloud-pubsub-0.32.0
- old
+ new
@@ -72,10 +72,11 @@
##
# @private Create an empty {Subscriber} object.
def initialize subscription_name, callback, deadline: nil, streams: nil,
inventory: nil, threads: {}, service: nil
@callback = callback
+ @error_callbacks = []
@subscription_name = subscription_name
@deadline = deadline || 60
@streams = streams || 4
@inventory = inventory || 1000
@callback_threads = (threads[:callback] || 8).to_i
@@ -165,9 +166,93 @@
# Whether the subscriber has been stopped.
#
# @return [boolean] `true` when stopped, `false` otherwise.
def stopped?
synchronize { @stopped }
+ end
+
+ ##
+ # Register to be notified of errors when raised.
+ #
+ # If an unhandled error has occurred the subscriber will attempt to
+ # recover from the error and resume listening.
+ #
+ # Multiple error handlers can be added.
+ #
+ # @yield [callback] The block to be called when an error is raised.
+ # @yieldparam [Exception] error The error raised.
+ #
+ # @example
+ # require "google/cloud/pubsub"
+ #
+ # pubsub = Google::Cloud::Pubsub.new
+ #
+ # sub = pubsub.subscription "my-topic-sub"
+ #
+ # subscriber = sub.listen do |received_message|
+ # # process message
+ # received_message.acknowledge!
+ # end
+ #
+ # # Register to be notified when unhandled errors occur.
+ # subscriber.on_error do |error|
+ # # log error
+ # puts error
+ # end
+ #
+ # # Start listening for messages and errors.
+ # subscriber.start
+ #
+ # # Shut down the subscriber when ready to stop receiving messages.
+ # subscriber.stop.wait!
+ #
+ def on_error &block
+ synchronize do
+ @error_callbacks << block
+ end
+ end
+
+ ##
+ # The most recent unhandled error to occur while listening to messages
+ # on the subscriber.
+ #
+ # If an unhandled error has occurred the subscriber will attempt to
+ # recover from the error and resume listening.
+ #
+ # @return [Exception, nil] error The most recent error raised.
+ #
+ # @example
+ # require "google/cloud/pubsub"
+ #
+ # pubsub = Google::Cloud::Pubsub.new
+ #
+ # sub = pubsub.subscription "my-topic-sub"
+ #
+ # subscriber = sub.listen do |received_message|
+ # # process message
+ # received_message.acknowledge!
+ # end
+ #
+ # # Start listening for messages and errors.
+ # subscriber.start
+ #
+ # # If an error was raised, it can be retrieved here:
+ # subscriber.last_error #=> nil
+ #
+ # # Shut down the subscriber when ready to stop receiving messages.
+ # subscriber.stop.wait!
+ #
+ def last_error
+ synchronize { @last_error }
+ end
+
+ # @private returns error object from the stream thread.
+ def error! error
+ error_callbacks = synchronize do
+ @last_error = error
+ @error_callbacks
+ end
+ error_callbacks.each { |error_callback| error_callback.call error }
end
##
# @private
def to_s