Sha256: a8848a471e58163a85f21037c036e47fc52946973566e8124db7537582fd72bb
Contents?: true
Size: 1.49 KB
Versions: 1
Compression:
Stored size: 1.49 KB
Contents
module Basquiat module Adapters class RabbitMq class Session def initialize(connection, session_options = {}) @connection = connection @options = session_options end def bind_queue(routing_key) queue.bind(exchange, routing_key: routing_key) end def publish(routing_key, message, props = {}) channel.confirm_select if @options[:publisher][:confirm] exchange.publish(Basquiat::Json.encode(message), { routing_key: routing_key, timestamp: Time.now.to_i }.merge(props)) end def subscribe(lock, &_block) queue.subscribe(block: lock, manual_ack: true) do |di, props, msg| message = Basquiat::Adapters::RabbitMq::Message.new(msg, di, props) yield message end end def channel @connection.start unless @connection.connected? @channel ||= @connection.create_channel end def queue @queue ||= channel.queue(@options[:queue][:name], durable: true, arguments: (@options[:queue][:options] || {})) end def exchange @exchange ||= channel.topic(@options[:exchange][:name], durable: true, arguments: (@options[:exchange][:options] || {})) end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
basquiat-1.2.0 | lib/basquiat/adapters/rabbitmq/session.rb |