lib/google/cloud/pubsub/subscriber.rb in google-cloud-pubsub-1.4.0 vs lib/google/cloud/pubsub/subscriber.rb in google-cloud-pubsub-1.5.0
- old
+ new
@@ -102,10 +102,11 @@
##
# Starts the subscriber pulling from the subscription and processing the
# received messages.
#
# @return [Subscriber] returns self so calls can be chained.
+ #
def start
start_pool = synchronize do
@started = true
@stopped = false
@@ -127,10 +128,11 @@
# unprocessed messages will be released back to the API and redelivered.
# Use {#wait!} to block until the subscriber is fully stopped and all
# received messages have been processed or released.
#
# @return [Subscriber] returns self so calls can be chained.
+ #
def stop
stop_pool = synchronize do
@started = false
@stopped = true
@@ -156,10 +158,11 @@
#
# @param [Number, nil] timeout The number of seconds to block until the
# subscriber is fully stopped. Default will block indefinitely.
#
# @return [Subscriber] returns self so calls can be chained.
+ #
def wait! timeout = nil
wait_pool = synchronize do
@stream_pool.map do |stream|
Thread.new { stream.wait! timeout }
end
@@ -188,18 +191,20 @@
##
# Whether the subscriber has been started.
#
# @return [boolean] `true` when started, `false` otherwise.
+ #
def started?
synchronize { @started }
end
##
# Whether the subscriber has been stopped.
#
# @return [boolean] `true` when stopped, `false` otherwise.
+ #
def stopped?
synchronize { @stopped }
end
##
@@ -277,41 +282,61 @@
synchronize { @last_error }
end
##
# The number of received messages to be collected by subscriber. Default is 1,000.
+ #
+ # @return [Integer] The maximum number of messages.
+ #
def max_outstanding_messages
@inventory[:max_outstanding_messages]
end
# @deprecated Use {#max_outstanding_messages}.
alias inventory_limit max_outstanding_messages
# @deprecated Use {#max_outstanding_messages}.
alias inventory max_outstanding_messages
##
# The total byte size of received messages to be collected by subscriber. Default is 100,000,000 (100MB).
+ #
+ # @return [Integer] The maximum number of bytes.
+ #
def max_outstanding_bytes
@inventory[:max_outstanding_bytes]
end
# @deprecated Use {#max_outstanding_bytes}.
alias inventory_bytesize max_outstanding_bytes
##
# The number of seconds that received messages can be held awaiting processing. Default is 3,600 (1 hour).
+ #
+ # @return [Integer] The maximum number of seconds.
+ #
def max_total_lease_duration
@inventory[:max_total_lease_duration]
end
# @deprecated Use {#max_total_lease_duration}.
alias inventory_extension max_total_lease_duration
##
+ # The maximum amount of time in seconds for a single lease extension attempt. Bounds the delay before a message
+ # redelivery if the subscriber fails to extend the deadline. Default is 0 (disabled).
+ #
+ # @return [Integer] The maximum number of seconds.
+ #
+ def max_duration_per_lease_extension
+ @inventory[:max_duration_per_lease_extension]
+ end
+
+ ##
# @private
def stream_inventory
{
- limit: @inventory[:max_outstanding_messages].fdiv(@streams).ceil,
- bytesize: @inventory[:max_outstanding_bytes].fdiv(@streams).ceil,
- extension: @inventory[:max_total_lease_duration]
+ limit: @inventory[:max_outstanding_messages].fdiv(@streams).ceil,
+ bytesize: @inventory[:max_outstanding_bytes].fdiv(@streams).ceil,
+ extension: @inventory[:max_total_lease_duration],
+ max_duration_per_lease_extension: @inventory[:max_duration_per_lease_extension]
}
end
# @private returns error object from the stream thread.
def error! error
@@ -349,9 +374,10 @@
@inventory = { max_outstanding_messages: @inventory }
end
@inventory[:max_outstanding_messages] = Integer(@inventory[:max_outstanding_messages] || 1000)
@inventory[:max_outstanding_bytes] = Integer(@inventory[:max_outstanding_bytes] || 100_000_000)
@inventory[:max_total_lease_duration] = Integer(@inventory[:max_total_lease_duration] || 3600)
+ @inventory[:max_duration_per_lease_extension] = Integer(@inventory[:max_duration_per_lease_extension] || 0)
end
def default_error_callbacks
# This is memoized to reduce calls to the configuration.
@default_error_callbacks ||= begin