Sha256: 9bafff53ca2841d534054267d44ed4797bfd4437cfa7ad801443c72cdeb1989a

Contents?: true

Size: 1.93 KB

Versions: 13

Compression:

Stored size: 1.93 KB

Contents

# frozen_string_literal: true

require 'bunny'

module Msgr
  # rubocop:disable Metrics/ClassLength
  class Connection
    include Logging

    attr_reader :uri, :config

    def initialize(uri, config, dispatcher)
      @uri        = uri
      @config     = config
      @dispatcher = dispatcher
      @channels   = []
    end

    def running?
      bindings.any?
    end

    def publish(message, opts = {})
      opts[:routing_key] = opts.delete(:to) if opts[:to]

      exchange.publish message.to_s, opts.merge(persistent: true)

      log(:debug) { "Published message to #{opts[:routing_key]}" }
    end

    def connection
      @connection ||= ::Bunny.new(config).tap(&:start)
    end

    def connect
      connection
    end

    def channel(prefetch: 1)
      channel = Msgr::Channel.new(config, connection)
      channel.prefetch(prefetch)
      @channels << channel
      channel
    end

    def exchange
      @exchange ||= channel.exchange
    end

    def release
      return if bindings.empty?
      log(:debug) { "Release bindings (#{bindings.size})..." }

      bindings.each(&:release)
    end

    def delete
      return if bindings.empty?
      log(:debug) { "Delete bindings (#{bindings.size})..." }

      bindings.each(&:delete)
    end

    def purge(**kwargs)
      return if bindings.empty?
      log(:debug) { "Purge bindings (#{bindings.size})..." }

      bindings.each {|b| b.purge(**kwargs) }
    end

    def bindings
      @bindings ||= []
    end

    def bind(routes)
      if routes.empty?
        log(:warn) do
          "No routes to bound to. Bind will have no effect:\n" \
          "  #{routes.inspect}"
        end
      else
        bind_all(routes)
      end
    end

    def close
      @channels.each(&:close)
      connection.close if @connection
      log(:debug) { 'Closed.' }
    end

    private

    def bind_all(routes)
      routes.each {|route| bindings << Binding.new(self, route, @dispatcher) }
    end
  end
end

Version data entries

13 entries across 13 versions & 1 rubygems

Version Path
msgr-1.2.0 lib/msgr/connection.rb
msgr-1.1.0.1.b306 lib/msgr/connection.rb
msgr-1.1.0.1.b305 lib/msgr/connection.rb
msgr-1.1.0.1.b302 lib/msgr/connection.rb
msgr-1.1.0.1.b301 lib/msgr/connection.rb
msgr-1.1.0.1.b300 lib/msgr/connection.rb
msgr-1.1.0.1.b297 lib/msgr/connection.rb
msgr-1.1.0.1.b296 lib/msgr/connection.rb
msgr-1.1.0.1.b295 lib/msgr/connection.rb
msgr-1.1.0.1.b292 lib/msgr/connection.rb
msgr-1.1.0.1.b291 lib/msgr/connection.rb
msgr-1.1.0.1.b288 lib/msgr/connection.rb
msgr-1.1.0.1.b285 lib/msgr/connection.rb