Sha256: a8848a471e58163a85f21037c036e47fc52946973566e8124db7537582fd72bb

Contents?: true

Size: 1.49 KB

Versions: 1

Compression:

Stored size: 1.49 KB

Contents

module Basquiat
  module Adapters
    class RabbitMq
      class Session
        def initialize(connection, session_options = {})
          @connection = connection
          @options    = session_options
        end

        def bind_queue(routing_key)
          queue.bind(exchange, routing_key: routing_key)
        end

        def publish(routing_key, message, props = {})
          channel.confirm_select if @options[:publisher][:confirm]
          exchange.publish(Basquiat::Json.encode(message),
                           { routing_key: routing_key,
                             timestamp:   Time.now.to_i }.merge(props))
        end

        def subscribe(lock, &_block)
          queue.subscribe(block: lock, manual_ack: true) do |di, props, msg|
            message = Basquiat::Adapters::RabbitMq::Message.new(msg, di, props)
            yield message
          end
        end

        def channel
          @connection.start unless @connection.connected?
          @channel ||= @connection.create_channel
        end

        def queue
          @queue ||= channel.queue(@options[:queue][:name],
                                   durable:   true,
                                   arguments: (@options[:queue][:options] || {}))
        end

        def exchange
          @exchange ||= channel.topic(@options[:exchange][:name],
                                      durable:   true,
                                      arguments: (@options[:exchange][:options] || {}))
        end
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
basquiat-1.2.0 lib/basquiat/adapters/rabbitmq/session.rb