lib/pwwka/receiver.rb in pwwka-0.7.0.RC1 vs lib/pwwka/receiver.rb in pwwka-0.7.0
- old
+ new
@@ -27,17 +27,13 @@
payload = ActiveSupport::HashWithIndifferentAccess.new(JSON.parse(payload))
handler_klass.handle!(delivery_info, properties, payload)
receiver.ack(delivery_info.delivery_tag)
logf "Processed Message on %{queue_name} -> %{payload}, %{routing_key}", queue_name: queue_name, payload: payload, routing_key: delivery_info.routing_key
rescue => e
- if Pwwka.configuration.requeue_on_error && !delivery_info.redelivered
- logf "Retrying an Error Processing Message on %{queue_name} -> %{payload}, %{routing_key}: %{exception}: %{backtrace}", queue_name: queue_name, payload: payload, routing_key: delivery_info.routing_key, exception: e, backtrace: e.backtrace.join(';'), at: :error
- receiver.nack_requeue(delivery_info.delivery_tag)
- else
- logf "Error Processing Message on %{queue_name} -> %{payload}, %{routing_key}: %{exception}: %{backtrace}", queue_name: queue_name, payload: payload, routing_key: delivery_info.routing_key, exception: e, backtrace: e.backtrace.join(';'), at: :error
- receiver.nack(delivery_info.delivery_tag)
- end
+ logf "Error Processing Message on %{queue_name} -> %{payload}, %{routing_key}: %{exception}: %{backtrace}", queue_name: queue_name, payload: payload, routing_key: delivery_info.routing_key, exception: e, backtrace: e.backtrace.join(';'), at: :error
+ # no requeue
+ receiver.nack(delivery_info.delivery_tag)
end
end
rescue Interrupt => _
# TODO: trap TERM within channel.work_pool
info "Interrupting queue #{queue_name} subscriber safely"
@@ -46,11 +42,11 @@
return receiver
end
def topic_queue
@topic_queue ||= begin
- queue = channel.queue(queue_name, durable: true, arguments: {})
+ queue = channel.queue(queue_name, durable: true)
queue.bind(topic_exchange, routing_key: routing_key)
queue
end
end
@@ -65,17 +61,17 @@
def nack_requeue(delivery_tag)
channel.nack(delivery_tag, false, true)
end
def drop_queue
- topic_queue.purge
+ topic_queue.purge
topic_queue.delete
end
def test_teardown
drop_queue
topic_exchange.delete
- channel_connector.connection_close
+ channel_connector.connection_close
end
end
end