lib/proletariat/subscriber.rb in proletariat-0.0.6 vs lib/proletariat/subscriber.rb in proletariat-0.1.0
- old
+ new
@@ -1,95 +1,65 @@
module Proletariat
# Internal: Creates, binds and listens on a RabbitMQ queue. Forwards
# messages to a given listener.
- class Subscriber
- include Concurrent::Runnable
-
+ class Subscriber < Actor
include Concerns::Logging
# Public: Creates a new Subscriber instance.
#
# listener - Object to delegate new messages to.
# queue_config - A QueueConfig value object.
- def initialize(listener, queue_config)
- @listener = listener
- @queue_config = queue_config
+ def initialize(listener, queue_config, exception_handler_class)
+ @listener = listener
+ @queue_config = queue_config
+ @exception_handler_class = exception_handler_class
- @channel = Proletariat.connection.create_channel
-
- @channel.prefetch Proletariat.worker_threads
-
- @exchange = @channel.topic Proletariat.exchange_name, durable: true
- @bunny_queue = @channel.queue queue_config.queue_name,
- durable: true,
- auto_delete: queue_config.auto_delete
-
bind_queue
- end
-
- # Internal: Called by the Concurrent framework on run. Used here to start
- # consumption of the queue and to log the status of the
- # subscriber.
- #
- # Returns nil.
- def on_run
start_consumer
- log_info 'Now online'
- nil
- end
-
- # Internal: Called by the Concurrent framework on run. Used here to stop
- # consumption of the queue and to log the status of the
- # subscriber.
- #
- # Returns nil.
- def on_stop
- log_info 'Attempting graceful shutdown.'
- stop_consumer
- log_info 'Now offline'
- end
-
- # Internal: Called by the Concurrent framework to perform work. Used here
- # acknowledge RabbitMQ messages.
- #
- # Returns nil.
- def on_task
- ready_acknowledgers.each do |acknowledger|
- acknowledger.acknowledge_on_channel channel
- acknowledgers.delete acknowledger
+ @ticker = Concurrent::TimerTask.execute(execution: 5, timeout: 2) do
+ acknowledge_messages
end
-
- completed_retries.each { |r| scheduled_retries.delete r }
end
- # Public: Purge the RabbitMQ queue.
+ # Internal: Called on actor termination. Used to stop consumption off the
+ # queue and end the ticker.
#
# Returns nil.
- def purge
- bunny_queue.purge
+ def cleanup
+ @ticker.kill if @ticker
+ stop_consumer if @consumer
+ @channel.close if @channel && channel.open?
nil
end
private
- # Internal: Returns the Bunny::Queue in use.
- attr_reader :bunny_queue
+ # Internal: Returns the ExceptionHandler class.
+ attr_reader :exception_handler_class
- # Internal: Returns the Bunny::Channel in use.
- attr_reader :channel
-
- # Internal: Returns the Bunny::Exchange in use.
- attr_reader :exchange
-
# Internal: Returns the listener object.
attr_reader :listener
# Internal: Returns the queue_config in use.
attr_reader :queue_config
+ # Internal: Acknowledge processed messages.
+ #
+ # Returns nil.
+ def acknowledge_messages
+ ready_acknowledgers.each do |acknowledger|
+ acknowledger.acknowledge_on_channel channel
+ acknowledgers.delete acknowledger
+ end
+
+ nil
+ end
+
+ # Internal: Returns array of Acknowledgers which haven't acknowledged their
+ # messages.
def acknowledgers
@acknowledgers ||= []
end
# Internal: Binds bunny_queue to the exchange via each routing key
@@ -102,29 +72,49 @@
end
nil
end
- # Internal: Get scheduled retries whose messages have been requeued.
- #
- # Returns an Array of Retrys.
- def completed_retries
- scheduled_retries.select { |r| r.requeued? }
+ # Internal: Returns the Bunny::Queue in use.
+ def bunny_queue
+ @bunny_queue ||= channel.queue(queue_config.queue_name,
+ durable: !Proletariat.test_mode?,
+ auto_delete: Proletariat.test_mode?)
end
+ # Internal: Returns the Bunny::Channel in use.
+ def channel
+ @channel ||= Proletariat.connection.create_channel.tap do |channel|
+ channel.prefetch Proletariat.worker_threads + 1
+ end
+ end
+
+ def exception_handler
+ @exception_handler ||= exception_handler_class.spawn!(
+ name: "#{queue_config.worker_name}_exception_handler",
+ supervise: true, args: [queue_config.queue_name]
+ )
+ end
+
+ # Internal: Returns the Bunny::Exchange in use.
+ def exchange
+ @exchange ||= channel.topic(Proletariat.exchange_name,
+ durable: !Proletariat.test_mode?)
+ end
+
# Internal: Forwards all message bodies to listener#post. Auto-acks
# messages not meant for this subscriber's workers.
#
# Returns nil.
def handle_message(info, properties, body)
if handles_worker_type? properties.headers['worker']
- future = listener.post?(body, info.routing_key, properties.headers)
- acknowledgers << Acknowledger.new(future, info.delivery_tag, {
- message: body, key: info.routing_key, headers: properties.headers,
- worker: queue_config.queue_name }, scheduled_retries)
+ message = Message.new(info.routing_key, body, properties.headers)
+ ivar = listener.ask(message)
+ acknowledgers << Acknowledger.new(ivar, info.delivery_tag, message,
+ exception_handler)
else
- channel.acknowledge info.delivery_tag
+ channel.ack info.delivery_tag
end
nil
end
@@ -144,23 +134,21 @@
acknowledgers.select do |acknowledger|
acknowledger.ready_to_acknowledge?
end
end
- def scheduled_retries
- @scheduled_retries ||= []
- end
-
# Internal: Starts a consumer on the queue. The consumer forwards all
# message bodies to listener#post. Auto-acks messages not meant
# for this subscriber's workers.
#
# Returns nil.
def start_consumer
- @consumer = bunny_queue.subscribe ack: true do |info, properties, body|
- handle_message info, properties, body
+ @consumer = bunny_queue.subscribe manual_ack: true do |info, props, body|
+ acknowledge_messages
+ handle_message info, props, body
+
nil
end
nil
end
@@ -170,11 +158,10 @@
#
# Returns nil.
def stop_consumer
@consumer.cancel if @consumer
wait_for_acknowledgers if acknowledgers.any?
- scheduled_retries.each { |r| r.expedite }
nil
end
# Internal: Makes blocking calls for each unacknowledged message until all
@@ -192,38 +179,40 @@
end
# Internal: Used to watch the state of dispatched Work and send ack/nack
# to a RabbitMQ channel.
class Acknowledger
+ include Concerns::Logging
+
# Public: Maximum time in seconds to wait synchronously for an
# acknowledgement.
MAX_BLOCK_TIME = 5
# Public: Creates a new Acknowledger instance.
#
- # future - A future-like object holding the Worker response.
+ # ivar - A ivar-like object holding the Worker response.
# delivery_tag - The RabbitMQ delivery tag for ack/nacking.
- # properties - The original message properties; for requeuing.
- # scheduled_retries - An Array to hold any created Retrys.
- def initialize(future, delivery_tag, properties, scheduled_retries)
- @future = future
+ # message - The original message; for exception handling.
+ # exception_handler - A reference to an ExceptionHandler.
+ def initialize(ivar, delivery_tag, message, exception_handler)
+ @ivar = ivar
@delivery_tag = delivery_tag
- @properties = properties
- @scheduled_retries = scheduled_retries
+ @message = message
+ @exception_handler = exception_handler
end
- # Public: Retrieves the value from the future and sends the relevant
+ # Public: Retrieves the value from the ivar and sends the relevant
# acknowledgement on a given channel. Logs a warning if the
- # future value is unexpected.
+ # ivar value is unexpected.
#
# channel - The Bunny::Channel to receive the acknowledgement.
#
# Returns nil.
def acknowledge_on_channel(channel)
- if future.fulfilled?
+ if ivar.fulfilled?
acknowledge_success(channel)
- elsif future.rejected?
+ elsif ivar.rejected?
acknowledge_error(channel)
end
nil
end
@@ -232,21 +221,21 @@
#
# channel - The Bunny::Channel to receive the acknowledgement.
#
# Returns nil.
def block_until_acknowledged(channel)
- future.value(MAX_BLOCK_TIME)
+ ivar.wait(MAX_BLOCK_TIME)
acknowledge_on_channel(channel)
nil
end
- # Public: Gets the readiness of the future for acknowledgement use.
+ # Public: Gets the readiness of the ivar for acknowledgement use.
#
- # Returns true if future is fulfilled or rejected.
+ # Returns true if ivar is fulfilled or rejected.
def ready_to_acknowledge?
- future.state != :pending
+ ivar.completed?
end
private
# Internal: Dispatches acknowledgements for non-errored worker responses.
@@ -254,12 +243,12 @@
#
# channel - The Bunny::Channel to receive the acknowledgement.
#
# Returns nil.
def acknowledge_success(channel)
- case future.value
- when :ok then channel.acknowledge delivery_tag
+ case ivar.value
+ when :ok then channel.ack delivery_tag
when :drop then channel.reject delivery_tag, false
when :requeue then channel.reject delivery_tag, true
else
Proletariat.logger.warn 'Unexpected return value from #work.'
channel.reject delivery_tag, false
@@ -273,102 +262,27 @@
#
# channel - The Bunny::Channel to receive the acknowledgement.
#
# Returns nil.
def acknowledge_error(channel)
- Proletariat.logger.error future.reason
+ Proletariat.logger.error ivar.reason
- scheduled_retries << Retry.new(properties)
- channel.acknowledge delivery_tag
+ exception_handler << message
+ channel.ack delivery_tag
nil
end
# Internal: Returns the RabbitMQ delivery tag.
attr_reader :delivery_tag
- # Internal: Returns the future-like object holding the Worker response.
- attr_reader :future
+ # Internal: Returns the ExceptionHandler reference.
+ attr_reader :exception_handler
- # Internal: Returns the original message properties.
- attr_reader :properties
+ # Internal: Returns the ivar-like object holding the Worker response.
+ attr_reader :ivar
- # Internal: Returns the Array of Retrys.
- attr_reader :scheduled_retries
-
- # Internal: Used publish an exponential delayed requeue for failures.
- class Retry
- # Public: Creates a new Retry instance. Sets appropriate headers for
- # requeue message.
- #
- # properties - The original message properties.
- def initialize(properties)
- @properties = properties
-
- properties[:headers]['failures'] = failures
- properties[:headers]['worker'] = properties[:worker]
-
- @scheduled_task = Concurrent::ScheduledTask.new(retry_delay) do
- requeue_message
- end
-
- @scheduled_task.execute
- end
-
- # Public: Attempt to requeue the message immediately if pending or
- # wait for natural completion.
- #
- # Returns nil.
- def expedite
- if scheduled_task.cancel
- requeue_message
- else
- scheduled_task.value
- end
-
- nil
- end
-
- # Public: Tests whether the message has been requeued.
- #
- # Returns a Boolean.
- def requeued?
- scheduled_task.fulfilled?
- end
-
- private
-
- # Internal: Returns the original message properties.
- attr_reader :properties
-
- # Internal: Returns the ScheduledTask which will requeue the message.
- attr_reader :scheduled_task
-
- # Internal: Fetches the current number of message failures from the
- # headers. Defaults to 1.
- #
- # Returns a Fixnum.
- def failures
- @failures ||= (properties[:headers]['failures'] || 0) + 1
- end
-
- # Internal: Performs the actual message requeue.
- #
- # Returns nil.
- def requeue_message
- Proletariat.publish(properties[:key], properties[:message],
- properties[:headers])
-
- nil
- end
-
- # Internal: Calculates an exponential retry delay based on the previous
- # number of failures. Capped with configuration setting.
- #
- # Returns the delay in seconds as a Fixnum.
- def retry_delay
- [2**failures, Proletariat.max_retry_delay].min
- end
- end
+ # Internal: Returns the original message.
+ attr_reader :message
end
end
end