lib/songkick_queue/worker.rb in songkick_queue-0.2.0 vs lib/songkick_queue/worker.rb in songkick_queue-0.3.0

- old
+ new

@@ -65,32 +65,35 @@ # @param consumer_class [Class] to subscribe to def subscribe_to_queue(consumer_class) queue = channel.queue(consumer_class.queue_name, durable: true, arguments: { 'x-ha-policy' => 'all' }) - queue.subscribe(manual_ack: true) do |delivery_info, properties, payload| - process_message(consumer_class, delivery_info, properties, payload) + queue.subscribe(manual_ack: true) do |delivery_info, properties, message| + process_message(consumer_class, delivery_info, properties, message) end logger.info "Subscribed #{consumer_class} to #{consumer_class.queue_name}" end # Handle receipt of a subscribed message # # @param consumer_class [Class] that was subscribed to # @param delivery_info [Bunny::DeliveryInfo] # @param properties [Bunny::MessageProperties] - # @param payload [String] to deserialize - def process_message(consumer_class, delivery_info, properties, payload) - logger.info "Processing message via #{consumer_class}..." + # @param message [String] to deserialize + def process_message(consumer_class, delivery_info, properties, message) + message = JSON.parse(message, symbolize_names: true) + # Handle both old and new format of messages + # TODO: Tidy this up once messages always have a payload + payload = message.fetch(:payload, message) + + logger.info "Processing message via #{consumer_class}..." set_process_name(consumer_class) - message = JSON.parse(payload, symbolize_names: true) - consumer = consumer_class.new(delivery_info, logger) - consumer.process(message) + consumer.process(payload) rescue Object => exception logger.error(exception) ensure set_process_name channel.ack(delivery_info.delivery_tag, false) @@ -113,17 +116,16 @@ # Update the name of this process, as viewed in `ps` or `top` # # @example idle # set_process_name #=> "songkick_queue[idle]" - # @example consumer running - # set_process_name(TweetConsumer) #=> "songkick_queue[tweet_consumer]" + # @example consumer running, namespace is removed + # set_process_name(Foo::TweetConsumer) #=> "songkick_queue[TweetConsumer]" # @param status [String] of the program def set_process_name(status = 'idle') formatted_status = String(status) - .gsub('::', '') - .gsub(/([A-Z]+)/) { "_#{$1.downcase}" } - .sub(/^_(\w)/) { $1 } + .split('::') + .last $PROGRAM_NAME = "#{process_name}[#{formatted_status}]" end end end