lib/vx/consumer/subscribe.rb in vx-consumer-0.1.4 vs lib/vx/consumer/subscribe.rb in vx-consumer-0.1.5
- old
+ new
@@ -1,11 +1,11 @@
module Vx
module Consumer
module Subscribe
- def subscribe
- ch, q = bind
+ def subscribe(options = {})
+ ch, q = bind(options)
subscriber = Subscriber.new(
ch,
q,
ch.generate_consumer_tag,
@@ -52,11 +52,12 @@
def decode_payload(properties, payload)
Serializer.unpack(properties[:content_type], payload, params.model)
end
- def bind
+ def bind(options = {})
+ qname = options[:queue] || params.queue_options
instrumentation = {
consumer: params.consumer_name
}
@@ -65,10 +66,10 @@
ch = session.conn.create_channel
session.assign_error_handlers_to_channel(ch)
ch.prefetch configuration.prefetch
x = session.declare_exchange ch, params.exchange_name, params.exchange_options
- q = session.declare_queue ch, params.queue_name, params.queue_options
+ q = session.declare_queue ch, qname, params.queue_options
instrumentation.merge!(
exchange: x.name,
queue: q.name,
queue_options: params.queue_options,