lib/songkick_queue/worker.rb in songkick_queue-0.2.0 vs lib/songkick_queue/worker.rb in songkick_queue-0.3.0
- old
+ new
@@ -65,32 +65,35 @@
# @param consumer_class [Class] to subscribe to
def subscribe_to_queue(consumer_class)
queue = channel.queue(consumer_class.queue_name, durable: true,
arguments: { 'x-ha-policy' => 'all' })
- queue.subscribe(manual_ack: true) do |delivery_info, properties, payload|
- process_message(consumer_class, delivery_info, properties, payload)
+ queue.subscribe(manual_ack: true) do |delivery_info, properties, message|
+ process_message(consumer_class, delivery_info, properties, message)
end
logger.info "Subscribed #{consumer_class} to #{consumer_class.queue_name}"
end
# Handle receipt of a subscribed message
#
# @param consumer_class [Class] that was subscribed to
# @param delivery_info [Bunny::DeliveryInfo]
# @param properties [Bunny::MessageProperties]
- # @param payload [String] to deserialize
- def process_message(consumer_class, delivery_info, properties, payload)
- logger.info "Processing message via #{consumer_class}..."
+ # @param message [String] to deserialize
+ def process_message(consumer_class, delivery_info, properties, message)
+ message = JSON.parse(message, symbolize_names: true)
+ # Handle both old and new format of messages
+ # TODO: Tidy this up once messages always have a payload
+ payload = message.fetch(:payload, message)
+
+ logger.info "Processing message via #{consumer_class}..."
set_process_name(consumer_class)
- message = JSON.parse(payload, symbolize_names: true)
-
consumer = consumer_class.new(delivery_info, logger)
- consumer.process(message)
+ consumer.process(payload)
rescue Object => exception
logger.error(exception)
ensure
set_process_name
channel.ack(delivery_info.delivery_tag, false)
@@ -113,17 +116,16 @@
# Update the name of this process, as viewed in `ps` or `top`
#
# @example idle
# set_process_name #=> "songkick_queue[idle]"
- # @example consumer running
- # set_process_name(TweetConsumer) #=> "songkick_queue[tweet_consumer]"
+ # @example consumer running, namespace is removed
+ # set_process_name(Foo::TweetConsumer) #=> "songkick_queue[TweetConsumer]"
# @param status [String] of the program
def set_process_name(status = 'idle')
formatted_status = String(status)
- .gsub('::', '')
- .gsub(/([A-Z]+)/) { "_#{$1.downcase}" }
- .sub(/^_(\w)/) { $1 }
+ .split('::')
+ .last
$PROGRAM_NAME = "#{process_name}[#{formatted_status}]"
end
end
end