Sha256: 179aa3e2dce6ea485123eb5cfa25da84211a283a62c4d006f70c623b44b5284f

Contents?: true

Size: 1.52 KB

Versions: 1

Compression:

Stored size: 1.52 KB

Contents

# frozen_string_literal: true

require "rabbit/publishing/message"

module Rabbit
  module Publishing
    autoload :Job, "rabbit/publishing/job"
    autoload :ChannelsPool, "rabbit/publishing/channels_pool"
    extend self

    MUTEX = Mutex.new

    def publish(msg)
      return if Rabbit.config.skip_publish?

      pool.with_channel msg.confirm_select? do |ch|
        ch.basic_publish *msg.basic_publish_args

        raise MessageNotDelivered, "RabbitMQ message not delivered: #{msg}" \
          if msg.confirm_select? && !ch.wait_for_confirms

        log msg
      end
    rescue Timeout::Error
      raise MessageNotDelivered, <<~MESSAGE
        Timeout while sending message #{msg}. Possible reasons:
          - #{msg.real_exchange_name} exchange is not found
          - RabbitMQ is extremely high loaded
      MESSAGE
    end

    def pool
      MUTEX.synchronize { @pool ||= ChannelsPool.new(create_client) }
    end

    private

    def create_queue_if_not_exists(channel, message)
      channel.queue(message.routing_key, durable: true)
    end

    def create_client
      config = Rails.application.config_for("sneakers") rescue {}
      config = config["bunny_options"].to_h.symbolize_keys

      Bunny.new(config).start
    end

    def log(message)
      @logger ||= Rabbit.config.publish_logger

      headers = [
        message.real_exchange_name, message.routing_key, message.event,
        message.confirm_select? ? "confirm" : "no-confirm"
      ]

      @logger.debug "#{headers.join ' / '}: #{message.data}"
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
rabbit_messaging-0.9.0 lib/rabbit/publishing.rb