Sha256: 4a9c9710b853d1202d04e138cc16acd2721a4c8234813261a50704dd8a6a6be8
Contents?: true
Size: 1.73 KB
Versions: 1
Compression:
Stored size: 1.73 KB
Contents
module Jackhammer class Topic QUEUE_NAME_KEY = 'queue_name'.freeze ROUTING_KEY_KEY = 'routing_key'.freeze def initialize(name:, options:, queue_config:) @topic = Jackhammer.channel.topic name, options @queue_config = normalize_queue_config(queue_config) end def subscribe_queues queues.each(&:subscribe) end # We're expecting the client to specify at least the routing_key in options # for each message published. def publish(message, options) full_options = Jackhammer.configuration.publish_options.dup.merge options topic.publish message, full_options end def queues return @queues if @queues @queues = queue_config.map do |options| handler = MessageReceiver.new(options.delete('handler')) routing_key = fetch_and_delete_key(options, ROUTING_KEY_KEY) queue_name = options.delete(QUEUE_NAME_KEY) || QueueName.from_routing_key(routing_key) queue = Jackhammer.channel.queue(queue_name, options) Log.info { "'#{queue_name}' configured to subscribe on '#{routing_key}'" } Queue.new(topic: topic, queue: queue, handler: handler, routing_key: routing_key) end end private attr_reader :topic, :queue_config # `queue_config` can be either: # - an array of options containing `queue_name` key # or # - a hash containing `queue_name => options` pairs def normalize_queue_config(config) return config if config.is_a?(Array) config.map do |name, options| options[QUEUE_NAME_KEY] = name options end end def fetch_and_delete_key(options, key) options.delete(key) || fail(InvalidConfigError, "#{key} not found in #{options.inspect}") end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
jackhammer-1.2.0 | lib/jackhammer/topic.rb |