lib/basquiat/adapters/rabbitmq/session.rb in basquiat-1.2.0 vs lib/basquiat/adapters/rabbitmq/session.rb in basquiat-1.3.0.pre.1

- old
+ new

@@ -1,45 +1,43 @@ module Basquiat module Adapters class RabbitMq class Session - def initialize(connection, session_options = {}) - @connection = connection - @options = session_options + attr_reader :channel + + def initialize(channel, session_options = {}) + @channel = channel + @options = session_options end def bind_queue(routing_key) queue.bind(exchange, routing_key: routing_key) end def publish(routing_key, message, props = {}) channel.confirm_select if @options[:publisher][:confirm] exchange.publish(Basquiat::Json.encode(message), { routing_key: routing_key, + persistent: true, timestamp: Time.now.to_i }.merge(props)) end - def subscribe(lock, &_block) - queue.subscribe(block: lock, manual_ack: true) do |di, props, msg| - message = Basquiat::Adapters::RabbitMq::Message.new(msg, di, props) - yield message + def subscribe(block: true, manual_ack: @options[:consumer][:manual_ack]) + channel.prefetch(@options[:consumer][:prefetch]) + queue.subscribe(block: block, manual_ack: manual_ack) do |di, props, msg| + yield Basquiat::Adapters::RabbitMq::Message.new(msg, di, props) end end - def channel - @connection.start unless @connection.connected? - @channel ||= @connection.create_channel - end - def queue @queue ||= channel.queue(@options[:queue][:name], - durable: true, + durable: @options[:queue][:durable], arguments: (@options[:queue][:options] || {})) end def exchange @exchange ||= channel.topic(@options[:exchange][:name], - durable: true, + durable: @options[:exchange][:durable], arguments: (@options[:exchange][:options] || {})) end end end end