Sha256: d189126bfa6be040be4bd860d0b85a6de2a3057126e9513cb0980f539ebffae2

Contents?: true

Size: 1.66 KB

Versions: 8

Compression:

Stored size: 1.66 KB

Contents

# frozen_string_literal: true

module Basquiat
  module Adapters
    class RabbitMq
      # A RabbitMQ session.
      class Session
        attr_reader :channel

        def initialize(channel, session_options = {})
          @channel = channel
          @options = session_options
        end

        def bind_queue(routing_key)
          queue.bind(exchange, routing_key: routing_key)
        end

        def publish(routing_key, message, props = {})
          channel.confirm_select if @options[:publisher][:confirm]
          exchange.publish(Basquiat::Json.encode(message),
                           { routing_key: routing_key,
                             persistent:  true,
                             timestamp:   Time.now.to_i }.merge(props))
        end

        def subscribe(block: true, manual_ack: @options[:consumer][:manual_ack])
          channel.prefetch(@options[:consumer][:prefetch])
          queue.subscribe(block: block, manual_ack: manual_ack) do |di, props, msg|
            yield Basquiat::Adapters::RabbitMq::Message.new(msg, di, props)
          end
        end

        def queue
          @queue ||= channel.queue(@options.dig(:queue, :name),
                                   durable:   @options.dig(:queue, :durable),
                                   arguments: (@options[:queue][:options] || {}))
        end

        def queue_name
          queue.name
        end

        def exchange
          @exchange ||= channel.topic(@options[:exchange][:name],
                                      durable:   @options[:exchange][:durable],
                                      arguments: (@options[:exchange][:options] || {}))
        end
      end
    end
  end
end

Version data entries

8 entries across 8 versions & 1 rubygems

Version Path
basquiat-1.6.0 lib/basquiat/adapters/rabbitmq/session.rb
basquiat-1.5.2 lib/basquiat/adapters/rabbitmq/session.rb
basquiat-1.5.1 lib/basquiat/adapters/rabbitmq/session.rb
basquiat-1.5.0 lib/basquiat/adapters/rabbitmq/session.rb
basquiat-1.4.0 lib/basquiat/adapters/rabbitmq/session.rb
basquiat-1.3.6 lib/basquiat/adapters/rabbitmq/session.rb
basquiat-1.3.5 lib/basquiat/adapters/rabbitmq/session.rb
basquiat-1.3.4 lib/basquiat/adapters/rabbitmq/session.rb