Sha256: 6c6e33f9d41078a168d4ab16bcc58127f16568e509b75a0035b2787704a5a32c

Contents?: true

Size: 1.75 KB

Versions: 3

Compression:

Stored size: 1.75 KB

Contents

module Jackhammer
  class Topic
    QUEUE_NAME_KEY = 'queue_name'.freeze
    ROUTING_KEY_KEY = 'routing_key'.freeze

    def initialize(name:, queue_config:, options: {})
      @topic = Jackhammer.channel.topic(name, options)
      @queue_config = normalize_queue_config(queue_config)
    end

    def subscribe_queues
      queues.each(&:subscribe)
    end

    # We're expecting the client to specify at least the routing_key in options
    # for each message published.
    def publish(message, options)
      Jackhammer.client_middleware.call(message, Jackhammer.publish_options(options)) do |msg, opts|
        topic.publish msg, opts
      end
    end

    def queues
      return @queues if @queues

      @queues = queue_config.map do |options|
        handler = MessageReceiver.new(options.delete('handler'))
        routing_key = fetch_and_delete_key(options, ROUTING_KEY_KEY)
        queue_name = options.delete(QUEUE_NAME_KEY) || QueueName.from_routing_key(routing_key)
        queue = Jackhammer.channel.queue(queue_name, options)
        Log.info { "'#{queue_name}' configured to subscribe on '#{routing_key}'" }
        Queue.new(topic: topic, queue: queue, handler: handler, routing_key: routing_key)
      end
    end

    private

    attr_reader :topic, :queue_config

    # `queue_config` can be either:
    # - an array of options containing `queue_name` key
    # or
    # - a hash containing `queue_name => options` pairs
    def normalize_queue_config(config)
      return config if config.is_a?(Array)

      config.map do |name, options|
        options[QUEUE_NAME_KEY] = name
        options
      end
    end

    def fetch_and_delete_key(options, key)
      options.delete(key) || fail(InvalidConfigError, "#{key} not found in #{options.inspect}")
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
jackhammer-1.5.1 lib/jackhammer/topic.rb
jackhammer-1.5.0 lib/jackhammer/topic.rb
jackhammer-1.5.0.rc lib/jackhammer/topic.rb