Sha256: e5f3c19ab2457e78ddb8a9f9405d3eddcde0ce2af502142eaa9f8366fd665f79

Contents?: true

Size: 1.99 KB

Versions: 1

Compression:

Stored size: 1.99 KB

Contents

require "harmoniser/channelable"
require "harmoniser/definition"

module Harmoniser
  module Publisher
    class MissingExchangeDefinition < StandardError; end
    include Channelable

    module ClassMethods
      def harmoniser_publisher(exchange_name:)
        @harmoniser_exchange_definition = Definition::Exchange.new(
          name: exchange_name,
          type: nil,
          opts: {passive: true}
        )
      end

      def publish(payload, opts = {})
        raise_missing_exchange_definition unless @harmoniser_exchange_definition

        const_get(:HARMONISER_PUBLISHER_MUTEX).synchronize do
          harmoniser_exchange.publish(payload, opts)
        end
        Harmoniser.logger.debug { "Message published: exchange = `#{@harmoniser_exchange_definition.name}`, payload = `#{payload}`, opts = `#{opts}`" }

        harmoniser_exchange
      end

      private

      def harmoniser_exchange
        @harmoniser_exchange ||= create_exchange
      end

      def create_exchange
        exchange = Bunny::Exchange.new(
          Publisher.create_channel,
          @harmoniser_exchange_definition.type,
          @harmoniser_exchange_definition.name,
          @harmoniser_exchange_definition.opts
        )
        handle_return(exchange)
        exchange
      end

      def raise_missing_exchange_definition
        raise MissingExchangeDefinition, "Please call the harmoniser_publisher class method at `#{name}` with the exchange_name that will be used for publishing"
      end

      def handle_return(exchange)
        exchange.on_return do |basic_return, properties, payload|
          Harmoniser.logger.warn("Default on_return handler executed for exchange: basic_return = `#{basic_return}`, properties = `#{properties}`, payload = `#{payload}`")
        end
      end
    end

    class << self
      def included(base)
        base.const_set(:HARMONISER_PUBLISHER_MUTEX, Mutex.new)
        base.private_constant(:HARMONISER_PUBLISHER_MUTEX)
        base.extend(ClassMethods)
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
harmoniser-0.10.0 lib/harmoniser/publisher.rb