lib/basquiat/adapters/rabbitmq/session.rb in basquiat-1.2.0 vs lib/basquiat/adapters/rabbitmq/session.rb in basquiat-1.3.0.pre.1
- old
+ new
@@ -1,45 +1,43 @@
module Basquiat
module Adapters
class RabbitMq
class Session
- def initialize(connection, session_options = {})
- @connection = connection
- @options = session_options
+ attr_reader :channel
+
+ def initialize(channel, session_options = {})
+ @channel = channel
+ @options = session_options
end
def bind_queue(routing_key)
queue.bind(exchange, routing_key: routing_key)
end
def publish(routing_key, message, props = {})
channel.confirm_select if @options[:publisher][:confirm]
exchange.publish(Basquiat::Json.encode(message),
{ routing_key: routing_key,
+ persistent: true,
timestamp: Time.now.to_i }.merge(props))
end
- def subscribe(lock, &_block)
- queue.subscribe(block: lock, manual_ack: true) do |di, props, msg|
- message = Basquiat::Adapters::RabbitMq::Message.new(msg, di, props)
- yield message
+ def subscribe(block: true, manual_ack: @options[:consumer][:manual_ack])
+ channel.prefetch(@options[:consumer][:prefetch])
+ queue.subscribe(block: block, manual_ack: manual_ack) do |di, props, msg|
+ yield Basquiat::Adapters::RabbitMq::Message.new(msg, di, props)
end
end
- def channel
- @connection.start unless @connection.connected?
- @channel ||= @connection.create_channel
- end
-
def queue
@queue ||= channel.queue(@options[:queue][:name],
- durable: true,
+ durable: @options[:queue][:durable],
arguments: (@options[:queue][:options] || {}))
end
def exchange
@exchange ||= channel.topic(@options[:exchange][:name],
- durable: true,
+ durable: @options[:exchange][:durable],
arguments: (@options[:exchange][:options] || {}))
end
end
end
end