Sha256: 6c6e33f9d41078a168d4ab16bcc58127f16568e509b75a0035b2787704a5a32c
Contents?: true
Size: 1.75 KB
Versions: 3
Compression:
Stored size: 1.75 KB
Contents
module Jackhammer class Topic QUEUE_NAME_KEY = 'queue_name'.freeze ROUTING_KEY_KEY = 'routing_key'.freeze def initialize(name:, queue_config:, options: {}) @topic = Jackhammer.channel.topic(name, options) @queue_config = normalize_queue_config(queue_config) end def subscribe_queues queues.each(&:subscribe) end # We're expecting the client to specify at least the routing_key in options # for each message published. def publish(message, options) Jackhammer.client_middleware.call(message, Jackhammer.publish_options(options)) do |msg, opts| topic.publish msg, opts end end def queues return @queues if @queues @queues = queue_config.map do |options| handler = MessageReceiver.new(options.delete('handler')) routing_key = fetch_and_delete_key(options, ROUTING_KEY_KEY) queue_name = options.delete(QUEUE_NAME_KEY) || QueueName.from_routing_key(routing_key) queue = Jackhammer.channel.queue(queue_name, options) Log.info { "'#{queue_name}' configured to subscribe on '#{routing_key}'" } Queue.new(topic: topic, queue: queue, handler: handler, routing_key: routing_key) end end private attr_reader :topic, :queue_config # `queue_config` can be either: # - an array of options containing `queue_name` key # or # - a hash containing `queue_name => options` pairs def normalize_queue_config(config) return config if config.is_a?(Array) config.map do |name, options| options[QUEUE_NAME_KEY] = name options end end def fetch_and_delete_key(options, key) options.delete(key) || fail(InvalidConfigError, "#{key} not found in #{options.inspect}") end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
jackhammer-1.5.1 | lib/jackhammer/topic.rb |
jackhammer-1.5.0 | lib/jackhammer/topic.rb |
jackhammer-1.5.0.rc | lib/jackhammer/topic.rb |