Sha256: 17aeb8c5079f87cb1a14ade2aac79b9b842487fd7c1d0942a0d4a1eafa68f4d7

Contents?: true

Size: 1.93 KB

Versions: 1

Compression:

Stored size: 1.93 KB

Contents

require 'bunny'
require 'delegate'

module Basquiat
  module Adapters
    # The RabbitMQ adapter for Basquiat
    class RabbitMq < Basquiat::Adapters::Base
      using Basquiat::HashRefinements


      # Avoid superclass mismatch errors
      require 'basquiat/adapters/rabbitmq/events'
      require 'basquiat/adapters/rabbitmq/message'
      require 'basquiat/adapters/rabbitmq/configuration'
      require 'basquiat/adapters/rabbitmq/connection'
      require 'basquiat/adapters/rabbitmq/session'
      require 'basquiat/adapters/rabbitmq/requeue_strategies'

      def initialize
        super
        @procs = Events.new
      end

      def base_options
        @configuration ||= Configuration.new
        @configuration.merge_user_options(Basquiat.configuration.adapter_options)
      end

      def subscribe_to(event_name, proc)
        procs[event_name] = proc
      end

      def publish(event, message, persistent: options[:publisher][:persistent], props: {})
        connection.with_network_failure_handler do
          session.publish(event, message, props)
          disconnect unless persistent
        end
      end

      def listen(block: true)
        connection.with_network_failure_handler do
          procs.keys.each { |key| session.bind_queue(key) }
          session.subscribe(block) do |message|
            strategy.run(message) do
              procs[message.routing_key].call(message)
            end
          end
        end
      end

      def reset_connection
        connection.disconnect
        @connection = nil
        @session    = nil
      end

      alias_method :disconnect, :reset_connection

      def strategy
        @strategy ||= @configuration.strategy.new(session)
      end

      def session
        @session ||= Session.new(connection, @configuration.session_options)
      end

      private

      def connection
        @connection ||= Connection.new(@configuration.connection_options)
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
basquiat-1.2.0 lib/basquiat/adapters/rabbitmq_adapter.rb