lib/proletariat/subscriber.rb in proletariat-0.0.4 vs lib/proletariat/subscriber.rb in proletariat-0.0.5

- old
+ new

@@ -56,10 +56,12 @@ def on_task ready_acknowledgers.each do |acknowledger| acknowledger.acknowledge_on_channel channel acknowledgers.delete acknowledger end + + completed_retries.each { |r| scheduled_retries.delete r } end # Public: Purge the RabbitMQ queue. # # Returns nil. @@ -100,27 +102,64 @@ end nil end + # Internal: Get scheduled retries whose messages have been requeued. + # + # Returns an Array of Retrys. + def completed_retries + scheduled_retries.select { |r| r.requeued? } + end + + # Internal: Forwards all message bodies to listener#post. Auto-acks + # messages not meant for this subscriber's workers. + # + # Returns nil. + def handle_message(info, properties, body) + if handles_worker_type? properties.headers['worker'] + future = listener.post?(body, info.routing_key, properties.headers) + acknowledgers << Acknowledger.new(future, info.delivery_tag, { + message: body, key: info.routing_key, headers: properties.headers, + worker: queue_config.queue_name }, scheduled_retries) + else + channel.acknowledge info.delivery_tag + end + + nil + end + + # Internal: Checks if subscriber should handle message for given worker + # header. + # + # Returns true if should be handled or header is nil. + # Returns false if should not be handled. + def handles_worker_type?(worker_header) + [nil, queue_config.queue_name].include? worker_header + end + # Internal: Get acknowledgers for messages whose work has completed. # # Returns an Array of Acknowledgers. def ready_acknowledgers acknowledgers.select do |acknowledger| acknowledger.ready_to_acknowledge? end end + def scheduled_retries + @scheduled_retries ||= [] + end + # Internal: Starts a consumer on the queue. The consumer forwards all - # message bodies to listener#post. + # message bodies to listener#post. Auto-acks messages not meant + # for this subscriber's workers. # # Returns nil. def start_consumer @consumer = bunny_queue.subscribe ack: true do |info, properties, body| - future = listener.post?(body, info.routing_key) - acknowledgers << Acknowledger.new(future, info.delivery_tag) + handle_message info, properties, body nil end nil @@ -131,10 +170,11 @@ # # Returns nil. def stop_consumer @consumer.cancel if @consumer wait_for_acknowledgers if acknowledgers.any? + scheduled_retries.each { |r| r.expedite } nil end # Internal: Makes blocking calls for each unacknowledged message until all @@ -158,15 +198,19 @@ # acknowledgement. MAX_BLOCK_TIME = 5 # Public: Creates a new Acknowledger instance. # - # future - A future-like object holding the Worker response. - # delivery_tag - The RabbitMQ delivery tag to be used when ack/nacking. - def initialize(future, delivery_tag) - @future = future - @delivery_tag = delivery_tag + # future - A future-like object holding the Worker response. + # delivery_tag - The RabbitMQ delivery tag for ack/nacking. + # properties - The original message properties; for requeuing. + # scheduled_retries - An Array to hold any created Retrys. + def initialize(future, delivery_tag, properties, scheduled_retries) + @future = future + @delivery_tag = delivery_tag + @properties = properties + @scheduled_retries = scheduled_retries end # Public: Retrieves the value from the future and sends the relevant # acknowledgement on a given channel. Logs a warning if the # future value is unexpected. @@ -230,18 +274,99 @@ # channel - The Bunny::Channel to receive the acknowledgement. # # Returns nil. def acknowledge_error(channel) Proletariat.logger.error future.reason - channel.reject delivery_tag, true + scheduled_retries << Retry.new(properties) + channel.acknowledge delivery_tag + nil end # Internal: Returns the RabbitMQ delivery tag. attr_reader :delivery_tag # Internal: Returns the future-like object holding the Worker response. attr_reader :future + + # Internal: Returns the original message properties. + attr_reader :properties + + # Internal: Returns the Array of Retrys. + attr_reader :scheduled_retries + + # Internal: Used publish an exponential delayed requeue for failures. + class Retry + # Public: Creates a new Retry instance. Sets appropriate headers for + # requeue message. + # + # properties - The original message properties. + def initialize(properties) + @properties = properties + + properties[:headers]['failures'] = failures + properties[:headers]['worker'] = properties[:worker] + + @scheduled_task = Concurrent::ScheduledTask.new(retry_delay) do + requeue_message + end + end + + # Public: Attempt to requeue the message immediately if pending or + # wait for natural completion. + # + # Returns nil. + def expedite + if scheduled_task.cancel + requeue_message + else + scheduled_task.value + end + + nil + end + + # Public: Tests whether the message has been requeued. + # + # Returns a Boolean. + def requeued? + scheduled_task.fulfilled? + end + + private + + # Internal: Returns the original message properties. + attr_reader :properties + + # Internal: Returns the ScheduledTask which will requeue the message. + attr_reader :scheduled_task + + # Internal: Fetches the current number of message failures from the + # headers. Defaults to 1. + # + # Returns a Fixnum. + def failures + @failures ||= (properties[:headers]['failures'] || 0) + 1 + end + + # Internal: Performs the actual message requeue. + # + # Returns nil. + def requeue_message + Proletariat.publish(properties[:key], properties[:message], + properties[:headers]) + + nil + end + + # Internal: Calculates an exponential retry delay based on the previous + # number of failures. Capped with configuration setting. + # + # Returns the delay in seconds as a Fixnum. + def retry_delay + [2**failures, Proletariat.max_retry_delay].min + end + end end end end