lib/pwwka/receiver.rb in pwwka-0.16.1 vs lib/pwwka/receiver.rb in pwwka-0.17.0
- old
+ new
@@ -15,18 +15,22 @@
@channel_connector = ChannelConnector.new(prefetch: prefetch)
@channel = @channel_connector.channel
@topic_exchange = @channel_connector.topic_exchange
end
- def self.subscribe(handler_klass, queue_name, routing_key: "#.#", block: true, prefetch: Pwwka.configuration.default_prefetch)
+ def self.subscribe(handler_klass, queue_name,
+ routing_key: "#.#",
+ block: true,
+ prefetch: Pwwka.configuration.default_prefetch,
+ payload_parser: Pwwka.configuration.payload_parser)
raise "#{handler_klass.name} must respond to `handle!`" unless handler_klass.respond_to?(:handle!)
receiver = new(queue_name, routing_key, prefetch: prefetch)
begin
info "Receiving on #{queue_name}"
receiver.topic_queue.subscribe(manual_ack: true, block: block) do |delivery_info, properties, payload|
begin
- payload = ActiveSupport::HashWithIndifferentAccess.new(JSON.parse(payload))
+ payload = payload_parser.(payload)
handler_klass.handle!(delivery_info, properties, payload)
receiver.ack(delivery_info.delivery_tag)
logf "Processed Message on %{queue_name} -> %{payload}, %{routing_key}", queue_name: queue_name, payload: payload, routing_key: delivery_info.routing_key
rescue => exception
Pwwka::ErrorHandlers::Chain.new(
@@ -76,8 +80,7 @@
def test_teardown
drop_queue
topic_exchange.delete
channel_connector.connection_close
end
-
end
end