lib/pwwka/receiver.rb in pwwka-0.6.0 vs lib/pwwka/receiver.rb in pwwka-0.7.0.RC1
- old
+ new
@@ -27,13 +27,17 @@
payload = ActiveSupport::HashWithIndifferentAccess.new(JSON.parse(payload))
handler_klass.handle!(delivery_info, properties, payload)
receiver.ack(delivery_info.delivery_tag)
logf "Processed Message on %{queue_name} -> %{payload}, %{routing_key}", queue_name: queue_name, payload: payload, routing_key: delivery_info.routing_key
rescue => e
- logf "Error Processing Message on %{queue_name} -> %{payload}, %{routing_key}: %{exception}", queue_name: queue_name, payload: payload, routing_key: delivery_info.routing_key, exception: e, at: :error
- # no requeue
- receiver.nack(delivery_info.delivery_tag)
+ if Pwwka.configuration.requeue_on_error && !delivery_info.redelivered
+ logf "Retrying an Error Processing Message on %{queue_name} -> %{payload}, %{routing_key}: %{exception}: %{backtrace}", queue_name: queue_name, payload: payload, routing_key: delivery_info.routing_key, exception: e, backtrace: e.backtrace.join(';'), at: :error
+ receiver.nack_requeue(delivery_info.delivery_tag)
+ else
+ logf "Error Processing Message on %{queue_name} -> %{payload}, %{routing_key}: %{exception}: %{backtrace}", queue_name: queue_name, payload: payload, routing_key: delivery_info.routing_key, exception: e, backtrace: e.backtrace.join(';'), at: :error
+ receiver.nack(delivery_info.delivery_tag)
+ end
end
end
rescue Interrupt => _
# TODO: trap TERM within channel.work_pool
info "Interrupting queue #{queue_name} subscriber safely"
@@ -42,11 +46,11 @@
return receiver
end
def topic_queue
@topic_queue ||= begin
- queue = channel.queue(queue_name, durable: true)
+ queue = channel.queue(queue_name, durable: true, arguments: {})
queue.bind(topic_exchange, routing_key: routing_key)
queue
end
end
@@ -61,17 +65,17 @@
def nack_requeue(delivery_tag)
channel.nack(delivery_tag, false, true)
end
def drop_queue
- topic_queue.purge
+ topic_queue.purge
topic_queue.delete
end
def test_teardown
drop_queue
topic_exchange.delete
- channel_connector.connection_close
+ channel_connector.connection_close
end
end
end