lib/pwwka/receiver.rb in pwwka-0.5.2 vs lib/pwwka/receiver.rb in pwwka-0.6.0
- old
+ new
@@ -25,13 +25,13 @@
receiver.topic_queue.subscribe(manual_ack: true, block: block) do |delivery_info, properties, payload|
begin
payload = ActiveSupport::HashWithIndifferentAccess.new(JSON.parse(payload))
handler_klass.handle!(delivery_info, properties, payload)
receiver.ack(delivery_info.delivery_tag)
- info "Processed Message on #{queue_name} -> #{payload}, #{delivery_info.routing_key}"
+ logf "Processed Message on %{queue_name} -> %{payload}, %{routing_key}", queue_name: queue_name, payload: payload, routing_key: delivery_info.routing_key
rescue => e
- error "Error Processing Message on #{queue_name} -> #{payload}, #{delivery_info.routing_key}: #{e}"
+ logf "Error Processing Message on %{queue_name} -> %{payload}, %{routing_key}: %{exception}", queue_name: queue_name, payload: payload, routing_key: delivery_info.routing_key, exception: e, at: :error
# no requeue
receiver.nack(delivery_info.delivery_tag)
end
end
rescue Interrupt => _
@@ -45,10 +45,10 @@
def topic_queue
@topic_queue ||= begin
queue = channel.queue(queue_name, durable: true)
queue.bind(topic_exchange, routing_key: routing_key)
queue
- end
+ end
end
def ack(delivery_tag)
channel.acknowledge(delivery_tag, false)
end