lib/google/cloud/pubsub/subscriber.rb in google-cloud-pubsub-0.31.1 vs lib/google/cloud/pubsub/subscriber.rb in google-cloud-pubsub-0.32.0

- old
+ new

@@ -72,10 +72,11 @@ ## # @private Create an empty {Subscriber} object. def initialize subscription_name, callback, deadline: nil, streams: nil, inventory: nil, threads: {}, service: nil @callback = callback + @error_callbacks = [] @subscription_name = subscription_name @deadline = deadline || 60 @streams = streams || 4 @inventory = inventory || 1000 @callback_threads = (threads[:callback] || 8).to_i @@ -165,9 +166,93 @@ # Whether the subscriber has been stopped. # # @return [boolean] `true` when stopped, `false` otherwise. def stopped? synchronize { @stopped } + end + + ## + # Register to be notified of errors when raised. + # + # If an unhandled error has occurred the subscriber will attempt to + # recover from the error and resume listening. + # + # Multiple error handlers can be added. + # + # @yield [callback] The block to be called when an error is raised. + # @yieldparam [Exception] error The error raised. + # + # @example + # require "google/cloud/pubsub" + # + # pubsub = Google::Cloud::Pubsub.new + # + # sub = pubsub.subscription "my-topic-sub" + # + # subscriber = sub.listen do |received_message| + # # process message + # received_message.acknowledge! + # end + # + # # Register to be notified when unhandled errors occur. + # subscriber.on_error do |error| + # # log error + # puts error + # end + # + # # Start listening for messages and errors. + # subscriber.start + # + # # Shut down the subscriber when ready to stop receiving messages. + # subscriber.stop.wait! + # + def on_error &block + synchronize do + @error_callbacks << block + end + end + + ## + # The most recent unhandled error to occur while listening to messages + # on the subscriber. + # + # If an unhandled error has occurred the subscriber will attempt to + # recover from the error and resume listening. + # + # @return [Exception, nil] error The most recent error raised. + # + # @example + # require "google/cloud/pubsub" + # + # pubsub = Google::Cloud::Pubsub.new + # + # sub = pubsub.subscription "my-topic-sub" + # + # subscriber = sub.listen do |received_message| + # # process message + # received_message.acknowledge! + # end + # + # # Start listening for messages and errors. + # subscriber.start + # + # # If an error was raised, it can be retrieved here: + # subscriber.last_error #=> nil + # + # # Shut down the subscriber when ready to stop receiving messages. + # subscriber.stop.wait! + # + def last_error + synchronize { @last_error } + end + + # @private returns error object from the stream thread. + def error! error + error_callbacks = synchronize do + @last_error = error + @error_callbacks + end + error_callbacks.each { |error_callback| error_callback.call error } end ## # @private def to_s