lib/proletariat/subscriber.rb in proletariat-0.0.4 vs lib/proletariat/subscriber.rb in proletariat-0.0.5
- old
+ new
@@ -56,10 +56,12 @@
def on_task
ready_acknowledgers.each do |acknowledger|
acknowledger.acknowledge_on_channel channel
acknowledgers.delete acknowledger
end
+
+ completed_retries.each { |r| scheduled_retries.delete r }
end
# Public: Purge the RabbitMQ queue.
#
# Returns nil.
@@ -100,27 +102,64 @@
end
nil
end
+ # Internal: Get scheduled retries whose messages have been requeued.
+ #
+ # Returns an Array of Retrys.
+ def completed_retries
+ scheduled_retries.select { |r| r.requeued? }
+ end
+
+ # Internal: Forwards all message bodies to listener#post. Auto-acks
+ # messages not meant for this subscriber's workers.
+ #
+ # Returns nil.
+ def handle_message(info, properties, body)
+ if handles_worker_type? properties.headers['worker']
+ future = listener.post?(body, info.routing_key, properties.headers)
+ acknowledgers << Acknowledger.new(future, info.delivery_tag, {
+ message: body, key: info.routing_key, headers: properties.headers,
+ worker: queue_config.queue_name }, scheduled_retries)
+ else
+ channel.acknowledge info.delivery_tag
+ end
+
+ nil
+ end
+
+ # Internal: Checks if subscriber should handle message for given worker
+ # header.
+ #
+ # Returns true if should be handled or header is nil.
+ # Returns false if should not be handled.
+ def handles_worker_type?(worker_header)
+ [nil, queue_config.queue_name].include? worker_header
+ end
+
# Internal: Get acknowledgers for messages whose work has completed.
#
# Returns an Array of Acknowledgers.
def ready_acknowledgers
acknowledgers.select do |acknowledger|
acknowledger.ready_to_acknowledge?
end
end
+ def scheduled_retries
+ @scheduled_retries ||= []
+ end
+
# Internal: Starts a consumer on the queue. The consumer forwards all
- # message bodies to listener#post.
+ # message bodies to listener#post. Auto-acks messages not meant
+ # for this subscriber's workers.
#
# Returns nil.
def start_consumer
@consumer = bunny_queue.subscribe ack: true do |info, properties, body|
- future = listener.post?(body, info.routing_key)
- acknowledgers << Acknowledger.new(future, info.delivery_tag)
+ handle_message info, properties, body
nil
end
nil
@@ -131,10 +170,11 @@
#
# Returns nil.
def stop_consumer
@consumer.cancel if @consumer
wait_for_acknowledgers if acknowledgers.any?
+ scheduled_retries.each { |r| r.expedite }
nil
end
# Internal: Makes blocking calls for each unacknowledged message until all
@@ -158,15 +198,19 @@
# acknowledgement.
MAX_BLOCK_TIME = 5
# Public: Creates a new Acknowledger instance.
#
- # future - A future-like object holding the Worker response.
- # delivery_tag - The RabbitMQ delivery tag to be used when ack/nacking.
- def initialize(future, delivery_tag)
- @future = future
- @delivery_tag = delivery_tag
+ # future - A future-like object holding the Worker response.
+ # delivery_tag - The RabbitMQ delivery tag for ack/nacking.
+ # properties - The original message properties; for requeuing.
+ # scheduled_retries - An Array to hold any created Retrys.
+ def initialize(future, delivery_tag, properties, scheduled_retries)
+ @future = future
+ @delivery_tag = delivery_tag
+ @properties = properties
+ @scheduled_retries = scheduled_retries
end
# Public: Retrieves the value from the future and sends the relevant
# acknowledgement on a given channel. Logs a warning if the
# future value is unexpected.
@@ -230,18 +274,99 @@
# channel - The Bunny::Channel to receive the acknowledgement.
#
# Returns nil.
def acknowledge_error(channel)
Proletariat.logger.error future.reason
- channel.reject delivery_tag, true
+ scheduled_retries << Retry.new(properties)
+ channel.acknowledge delivery_tag
+
nil
end
# Internal: Returns the RabbitMQ delivery tag.
attr_reader :delivery_tag
# Internal: Returns the future-like object holding the Worker response.
attr_reader :future
+
+ # Internal: Returns the original message properties.
+ attr_reader :properties
+
+ # Internal: Returns the Array of Retrys.
+ attr_reader :scheduled_retries
+
+ # Internal: Used publish an exponential delayed requeue for failures.
+ class Retry
+ # Public: Creates a new Retry instance. Sets appropriate headers for
+ # requeue message.
+ #
+ # properties - The original message properties.
+ def initialize(properties)
+ @properties = properties
+
+ properties[:headers]['failures'] = failures
+ properties[:headers]['worker'] = properties[:worker]
+
+ @scheduled_task = Concurrent::ScheduledTask.new(retry_delay) do
+ requeue_message
+ end
+ end
+
+ # Public: Attempt to requeue the message immediately if pending or
+ # wait for natural completion.
+ #
+ # Returns nil.
+ def expedite
+ if scheduled_task.cancel
+ requeue_message
+ else
+ scheduled_task.value
+ end
+
+ nil
+ end
+
+ # Public: Tests whether the message has been requeued.
+ #
+ # Returns a Boolean.
+ def requeued?
+ scheduled_task.fulfilled?
+ end
+
+ private
+
+ # Internal: Returns the original message properties.
+ attr_reader :properties
+
+ # Internal: Returns the ScheduledTask which will requeue the message.
+ attr_reader :scheduled_task
+
+ # Internal: Fetches the current number of message failures from the
+ # headers. Defaults to 1.
+ #
+ # Returns a Fixnum.
+ def failures
+ @failures ||= (properties[:headers]['failures'] || 0) + 1
+ end
+
+ # Internal: Performs the actual message requeue.
+ #
+ # Returns nil.
+ def requeue_message
+ Proletariat.publish(properties[:key], properties[:message],
+ properties[:headers])
+
+ nil
+ end
+
+ # Internal: Calculates an exponential retry delay based on the previous
+ # number of failures. Capped with configuration setting.
+ #
+ # Returns the delay in seconds as a Fixnum.
+ def retry_delay
+ [2**failures, Proletariat.max_retry_delay].min
+ end
+ end
end
end
end