lib/google/cloud/pubsub/subscriber.rb in google-cloud-pubsub-1.4.0 vs lib/google/cloud/pubsub/subscriber.rb in google-cloud-pubsub-1.5.0

- old
+ new

@@ -102,10 +102,11 @@ ## # Starts the subscriber pulling from the subscription and processing the # received messages. # # @return [Subscriber] returns self so calls can be chained. + # def start start_pool = synchronize do @started = true @stopped = false @@ -127,10 +128,11 @@ # unprocessed messages will be released back to the API and redelivered. # Use {#wait!} to block until the subscriber is fully stopped and all # received messages have been processed or released. # # @return [Subscriber] returns self so calls can be chained. + # def stop stop_pool = synchronize do @started = false @stopped = true @@ -156,10 +158,11 @@ # # @param [Number, nil] timeout The number of seconds to block until the # subscriber is fully stopped. Default will block indefinitely. # # @return [Subscriber] returns self so calls can be chained. + # def wait! timeout = nil wait_pool = synchronize do @stream_pool.map do |stream| Thread.new { stream.wait! timeout } end @@ -188,18 +191,20 @@ ## # Whether the subscriber has been started. # # @return [boolean] `true` when started, `false` otherwise. + # def started? synchronize { @started } end ## # Whether the subscriber has been stopped. # # @return [boolean] `true` when stopped, `false` otherwise. + # def stopped? synchronize { @stopped } end ## @@ -277,41 +282,61 @@ synchronize { @last_error } end ## # The number of received messages to be collected by subscriber. Default is 1,000. + # + # @return [Integer] The maximum number of messages. + # def max_outstanding_messages @inventory[:max_outstanding_messages] end # @deprecated Use {#max_outstanding_messages}. alias inventory_limit max_outstanding_messages # @deprecated Use {#max_outstanding_messages}. alias inventory max_outstanding_messages ## # The total byte size of received messages to be collected by subscriber. Default is 100,000,000 (100MB). + # + # @return [Integer] The maximum number of bytes. + # def max_outstanding_bytes @inventory[:max_outstanding_bytes] end # @deprecated Use {#max_outstanding_bytes}. alias inventory_bytesize max_outstanding_bytes ## # The number of seconds that received messages can be held awaiting processing. Default is 3,600 (1 hour). + # + # @return [Integer] The maximum number of seconds. + # def max_total_lease_duration @inventory[:max_total_lease_duration] end # @deprecated Use {#max_total_lease_duration}. alias inventory_extension max_total_lease_duration ## + # The maximum amount of time in seconds for a single lease extension attempt. Bounds the delay before a message + # redelivery if the subscriber fails to extend the deadline. Default is 0 (disabled). + # + # @return [Integer] The maximum number of seconds. + # + def max_duration_per_lease_extension + @inventory[:max_duration_per_lease_extension] + end + + ## # @private def stream_inventory { - limit: @inventory[:max_outstanding_messages].fdiv(@streams).ceil, - bytesize: @inventory[:max_outstanding_bytes].fdiv(@streams).ceil, - extension: @inventory[:max_total_lease_duration] + limit: @inventory[:max_outstanding_messages].fdiv(@streams).ceil, + bytesize: @inventory[:max_outstanding_bytes].fdiv(@streams).ceil, + extension: @inventory[:max_total_lease_duration], + max_duration_per_lease_extension: @inventory[:max_duration_per_lease_extension] } end # @private returns error object from the stream thread. def error! error @@ -349,9 +374,10 @@ @inventory = { max_outstanding_messages: @inventory } end @inventory[:max_outstanding_messages] = Integer(@inventory[:max_outstanding_messages] || 1000) @inventory[:max_outstanding_bytes] = Integer(@inventory[:max_outstanding_bytes] || 100_000_000) @inventory[:max_total_lease_duration] = Integer(@inventory[:max_total_lease_duration] || 3600) + @inventory[:max_duration_per_lease_extension] = Integer(@inventory[:max_duration_per_lease_extension] || 0) end def default_error_callbacks # This is memoized to reduce calls to the configuration. @default_error_callbacks ||= begin