lib/songkick_queue/worker.rb in songkick_queue-0.5.0 vs lib/songkick_queue/worker.rb in songkick_queue-0.6.0

- old
+ new

@@ -81,19 +81,28 @@ # @param properties [Bunny::MessageProperties] # @param message [String] to deserialize def process_message(consumer_class, delivery_info, properties, message) message = JSON.parse(message, symbolize_names: true) - # Handle both old and new format of messages - # TODO: Tidy this up once messages always have a payload - payload = message.fetch(:payload, message) + message_id = message.fetch(:message_id) + produced_at = message.fetch(:produced_at) + payload = message.fetch(:payload) - logger.info "Processing message via #{consumer_class}..." - set_process_name(consumer_class) + logger.info "Processing message #{message_id} via #{consumer_class}, produced at #{produced_at}" + set_process_name(consumer_class, message_id) consumer = consumer_class.new(delivery_info, logger) - consumer.process(payload) + + instrumentation_options = { + consumer_class: consumer_class.to_s, + queue_name: consumer_class.queue_name, + message_id: message_id, + produced_at: produced_at, + } + ActiveSupport::Notifications.instrument('consume_message.songkick_queue', instrumentation_options) do + consumer.process(payload) + end rescue Object => exception logger.error(exception) ensure set_process_name channel.ack(delivery_info.delivery_tag, false) @@ -114,16 +123,21 @@ # Update the name of this process, as viewed in `ps` or `top` # # @example idle # set_process_name #=> "songkick_queue[idle]" # @example consumer running, namespace is removed - # set_process_name(Foo::TweetConsumer) #=> "songkick_queue[TweetConsumer]" + # set_process_name(Foo::TweetConsumer, 'a729bcd8') #=> "songkick_queue[TweetConsumer#a729bcd8]" # @param status [String] of the program - def set_process_name(status = 'idle') + # @param message_id [String] identifying the message currently being consumed + def set_process_name(status = 'idle', message_id = nil) formatted_status = String(status) .split('::') .last - $PROGRAM_NAME = "#{process_name}[#{formatted_status}]" + ident = [formatted_status, message_id] + .compact + .join('#') + + $PROGRAM_NAME = "#{process_name}[#{ident}]" end end end