Sha256: e5f3c19ab2457e78ddb8a9f9405d3eddcde0ce2af502142eaa9f8366fd665f79
Contents?: true
Size: 1.99 KB
Versions: 1
Compression:
Stored size: 1.99 KB
Contents
require "harmoniser/channelable" require "harmoniser/definition" module Harmoniser module Publisher class MissingExchangeDefinition < StandardError; end include Channelable module ClassMethods def harmoniser_publisher(exchange_name:) @harmoniser_exchange_definition = Definition::Exchange.new( name: exchange_name, type: nil, opts: {passive: true} ) end def publish(payload, opts = {}) raise_missing_exchange_definition unless @harmoniser_exchange_definition const_get(:HARMONISER_PUBLISHER_MUTEX).synchronize do harmoniser_exchange.publish(payload, opts) end Harmoniser.logger.debug { "Message published: exchange = `#{@harmoniser_exchange_definition.name}`, payload = `#{payload}`, opts = `#{opts}`" } harmoniser_exchange end private def harmoniser_exchange @harmoniser_exchange ||= create_exchange end def create_exchange exchange = Bunny::Exchange.new( Publisher.create_channel, @harmoniser_exchange_definition.type, @harmoniser_exchange_definition.name, @harmoniser_exchange_definition.opts ) handle_return(exchange) exchange end def raise_missing_exchange_definition raise MissingExchangeDefinition, "Please call the harmoniser_publisher class method at `#{name}` with the exchange_name that will be used for publishing" end def handle_return(exchange) exchange.on_return do |basic_return, properties, payload| Harmoniser.logger.warn("Default on_return handler executed for exchange: basic_return = `#{basic_return}`, properties = `#{properties}`, payload = `#{payload}`") end end end class << self def included(base) base.const_set(:HARMONISER_PUBLISHER_MUTEX, Mutex.new) base.private_constant(:HARMONISER_PUBLISHER_MUTEX) base.extend(ClassMethods) end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
harmoniser-0.10.0 | lib/harmoniser/publisher.rb |