Sha256: df659b1a217088bc89df96e20d9cce17b6561ea14ac6eb179b8070ef784da17a

Contents?: true

Size: 1.31 KB

Versions: 1

Compression:

Stored size: 1.31 KB

Contents

require 'securerandom'

module Vx
  module Consumer
    module Publish

      def publish(payload, options = {})
        session.open

        options ||= {}
        options[:routing_key]  = params.routing_key if params.routing_key && !options.key?(:routing_key)
        options[:headers]      = params.headers     if params.headers && !options.key?(:headers)

        options[:content_type] ||= params.content_type || configuration.content_type
        options[:message_id]   ||= SecureRandom.uuid

        name = params.exchange_name

        instrumentation = {
          payload:     payload,
          exchange:    name,
          consumer:    params.consumer_name,
          properties:  options,
        }

        with_middlewares :pub, instrumentation do
          with_channel do |ch|
            instrument("process_publishing", instrumentation.merge(channel: ch.id)) do
              encoded = encode_payload(payload, options[:content_type])
              x = session.declare_exchange ch, name, params.exchange_options
              x.publish encoded, options
            end
          end
        end
      end

      private

        def with_channel
          yield session.pub_channel
        end

        def encode_payload(payload, content_type)
          Serializer.pack(content_type, payload)
        end

    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
vx-consumer-0.1.3 lib/vx/consumer/publish.rb