lib/pwwka/receiver.rb in pwwka-0.7.0.RC1 vs lib/pwwka/receiver.rb in pwwka-0.7.0

- old
+ new

@@ -27,17 +27,13 @@ payload = ActiveSupport::HashWithIndifferentAccess.new(JSON.parse(payload)) handler_klass.handle!(delivery_info, properties, payload) receiver.ack(delivery_info.delivery_tag) logf "Processed Message on %{queue_name} -> %{payload}, %{routing_key}", queue_name: queue_name, payload: payload, routing_key: delivery_info.routing_key rescue => e - if Pwwka.configuration.requeue_on_error && !delivery_info.redelivered - logf "Retrying an Error Processing Message on %{queue_name} -> %{payload}, %{routing_key}: %{exception}: %{backtrace}", queue_name: queue_name, payload: payload, routing_key: delivery_info.routing_key, exception: e, backtrace: e.backtrace.join(';'), at: :error - receiver.nack_requeue(delivery_info.delivery_tag) - else - logf "Error Processing Message on %{queue_name} -> %{payload}, %{routing_key}: %{exception}: %{backtrace}", queue_name: queue_name, payload: payload, routing_key: delivery_info.routing_key, exception: e, backtrace: e.backtrace.join(';'), at: :error - receiver.nack(delivery_info.delivery_tag) - end + logf "Error Processing Message on %{queue_name} -> %{payload}, %{routing_key}: %{exception}: %{backtrace}", queue_name: queue_name, payload: payload, routing_key: delivery_info.routing_key, exception: e, backtrace: e.backtrace.join(';'), at: :error + # no requeue + receiver.nack(delivery_info.delivery_tag) end end rescue Interrupt => _ # TODO: trap TERM within channel.work_pool info "Interrupting queue #{queue_name} subscriber safely" @@ -46,11 +42,11 @@ return receiver end def topic_queue @topic_queue ||= begin - queue = channel.queue(queue_name, durable: true, arguments: {}) + queue = channel.queue(queue_name, durable: true) queue.bind(topic_exchange, routing_key: routing_key) queue end end @@ -65,17 +61,17 @@ def nack_requeue(delivery_tag) channel.nack(delivery_tag, false, true) end def drop_queue - topic_queue.purge + topic_queue.purge topic_queue.delete end def test_teardown drop_queue topic_exchange.delete - channel_connector.connection_close + channel_connector.connection_close end end end