Sha256: 8dea3728e376072ee235fda5c5d32e0bcb1259a30c3cc297a045da7e0d678190

Contents?: true

Size: 935 Bytes

Versions: 6

Compression:

Stored size: 935 Bytes

Contents

# frozen_string_literal: true

module Jackhammer
  class Topic
    def initialize(name:, options:, queue_config:)
      @topic = Jackhammer.channel.topic name, options
      @queue_config = queue_config
    end

    def subscribe_queues
      queues.each(&:subscribe)
    end

    # We're expecting the client to specify at least the routing_key in options
    # for each message published.
    def publish(message, options)
      full_options = Jackhammer.configuration.publish_options.dup.merge options
      @topic.publish message, full_options
    end

    def queues
      return @queues if @queues

      @queues = @queue_config.map do |name, options|
        handler = MessageReceiver.new(options.delete('handler'))
        routing = options.delete 'routing_key'
        queue = Jackhammer.channel.queue name, options
        Queue.new topic: @topic, queue: queue, handler: handler, routing: routing
      end
    end
  end
end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
jackhammer-0.2.3 lib/jackhammer/topic.rb
jackhammer-0.2.2 lib/jackhammer/topic.rb
jackhammer-0.2.1 lib/jackhammer/topic.rb
jackhammer-0.2.0 lib/jackhammer/topic.rb
jackhammer-0.1.1 lib/jackhammer/topic.rb
jackhammer-0.1.0 lib/jackhammer/topic.rb