lib/pwwka/receiver.rb in pwwka-0.6.0 vs lib/pwwka/receiver.rb in pwwka-0.7.0.RC1

- old
+ new

@@ -27,13 +27,17 @@ payload = ActiveSupport::HashWithIndifferentAccess.new(JSON.parse(payload)) handler_klass.handle!(delivery_info, properties, payload) receiver.ack(delivery_info.delivery_tag) logf "Processed Message on %{queue_name} -> %{payload}, %{routing_key}", queue_name: queue_name, payload: payload, routing_key: delivery_info.routing_key rescue => e - logf "Error Processing Message on %{queue_name} -> %{payload}, %{routing_key}: %{exception}", queue_name: queue_name, payload: payload, routing_key: delivery_info.routing_key, exception: e, at: :error - # no requeue - receiver.nack(delivery_info.delivery_tag) + if Pwwka.configuration.requeue_on_error && !delivery_info.redelivered + logf "Retrying an Error Processing Message on %{queue_name} -> %{payload}, %{routing_key}: %{exception}: %{backtrace}", queue_name: queue_name, payload: payload, routing_key: delivery_info.routing_key, exception: e, backtrace: e.backtrace.join(';'), at: :error + receiver.nack_requeue(delivery_info.delivery_tag) + else + logf "Error Processing Message on %{queue_name} -> %{payload}, %{routing_key}: %{exception}: %{backtrace}", queue_name: queue_name, payload: payload, routing_key: delivery_info.routing_key, exception: e, backtrace: e.backtrace.join(';'), at: :error + receiver.nack(delivery_info.delivery_tag) + end end end rescue Interrupt => _ # TODO: trap TERM within channel.work_pool info "Interrupting queue #{queue_name} subscriber safely" @@ -42,11 +46,11 @@ return receiver end def topic_queue @topic_queue ||= begin - queue = channel.queue(queue_name, durable: true) + queue = channel.queue(queue_name, durable: true, arguments: {}) queue.bind(topic_exchange, routing_key: routing_key) queue end end @@ -61,17 +65,17 @@ def nack_requeue(delivery_tag) channel.nack(delivery_tag, false, true) end def drop_queue - topic_queue.purge + topic_queue.purge topic_queue.delete end def test_teardown drop_queue topic_exchange.delete - channel_connector.connection_close + channel_connector.connection_close end end end