lib/songkick_queue/worker.rb in songkick_queue-0.5.0 vs lib/songkick_queue/worker.rb in songkick_queue-0.6.0
- old
+ new
@@ -81,19 +81,28 @@
# @param properties [Bunny::MessageProperties]
# @param message [String] to deserialize
def process_message(consumer_class, delivery_info, properties, message)
message = JSON.parse(message, symbolize_names: true)
- # Handle both old and new format of messages
- # TODO: Tidy this up once messages always have a payload
- payload = message.fetch(:payload, message)
+ message_id = message.fetch(:message_id)
+ produced_at = message.fetch(:produced_at)
+ payload = message.fetch(:payload)
- logger.info "Processing message via #{consumer_class}..."
- set_process_name(consumer_class)
+ logger.info "Processing message #{message_id} via #{consumer_class}, produced at #{produced_at}"
+ set_process_name(consumer_class, message_id)
consumer = consumer_class.new(delivery_info, logger)
- consumer.process(payload)
+
+ instrumentation_options = {
+ consumer_class: consumer_class.to_s,
+ queue_name: consumer_class.queue_name,
+ message_id: message_id,
+ produced_at: produced_at,
+ }
+ ActiveSupport::Notifications.instrument('consume_message.songkick_queue', instrumentation_options) do
+ consumer.process(payload)
+ end
rescue Object => exception
logger.error(exception)
ensure
set_process_name
channel.ack(delivery_info.delivery_tag, false)
@@ -114,16 +123,21 @@
# Update the name of this process, as viewed in `ps` or `top`
#
# @example idle
# set_process_name #=> "songkick_queue[idle]"
# @example consumer running, namespace is removed
- # set_process_name(Foo::TweetConsumer) #=> "songkick_queue[TweetConsumer]"
+ # set_process_name(Foo::TweetConsumer, 'a729bcd8') #=> "songkick_queue[TweetConsumer#a729bcd8]"
# @param status [String] of the program
- def set_process_name(status = 'idle')
+ # @param message_id [String] identifying the message currently being consumed
+ def set_process_name(status = 'idle', message_id = nil)
formatted_status = String(status)
.split('::')
.last
- $PROGRAM_NAME = "#{process_name}[#{formatted_status}]"
+ ident = [formatted_status, message_id]
+ .compact
+ .join('#')
+
+ $PROGRAM_NAME = "#{process_name}[#{ident}]"
end
end
end