lib/pwwka/receiver.rb in pwwka-0.16.1 vs lib/pwwka/receiver.rb in pwwka-0.17.0

- old
+ new

@@ -15,18 +15,22 @@ @channel_connector = ChannelConnector.new(prefetch: prefetch) @channel = @channel_connector.channel @topic_exchange = @channel_connector.topic_exchange end - def self.subscribe(handler_klass, queue_name, routing_key: "#.#", block: true, prefetch: Pwwka.configuration.default_prefetch) + def self.subscribe(handler_klass, queue_name, + routing_key: "#.#", + block: true, + prefetch: Pwwka.configuration.default_prefetch, + payload_parser: Pwwka.configuration.payload_parser) raise "#{handler_klass.name} must respond to `handle!`" unless handler_klass.respond_to?(:handle!) receiver = new(queue_name, routing_key, prefetch: prefetch) begin info "Receiving on #{queue_name}" receiver.topic_queue.subscribe(manual_ack: true, block: block) do |delivery_info, properties, payload| begin - payload = ActiveSupport::HashWithIndifferentAccess.new(JSON.parse(payload)) + payload = payload_parser.(payload) handler_klass.handle!(delivery_info, properties, payload) receiver.ack(delivery_info.delivery_tag) logf "Processed Message on %{queue_name} -> %{payload}, %{routing_key}", queue_name: queue_name, payload: payload, routing_key: delivery_info.routing_key rescue => exception Pwwka::ErrorHandlers::Chain.new( @@ -76,8 +80,7 @@ def test_teardown drop_queue topic_exchange.delete channel_connector.connection_close end - end end