Sha256: 6a0e3d45bb40c45532e3a73d2d46c768c9f08c530e6fd69815535c4212ebd88e
Contents?: true
Size: 1.13 KB
Versions: 2
Compression:
Stored size: 1.13 KB
Contents
module EventBus module Broker class Rabbit::Queue def initialize(connection) @channel = connection @channel.prefetch(1) end def self.subscribe(connection, routing_key, &block) new(connection).subscribe(routing_key, &block) end def subscribe(routing_key, &block) name = queue_name(routing_key) channel.queue(name, queue_options) .bind(topic, routing_key: routing_key) .subscribe(manual_ack: true) do |delivery_info, properties, payload| callback(delivery_info, properties, payload, &block) end end private attr_reader :channel def callback(delivery_info, properties, payload, &block) event_name = delivery_info.routing_key event = EventBus::Event.new(event_name, payload) block.call(event, channel, delivery_info) end def topic Rabbit::Topic.topic(channel) end def queue_options { durable: true } end def queue_name(routing_key) "#{EventBus::Config::APP_NAME.downcase}-#{routing_key.downcase}" end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
event_bus_rb-2.0.2 | lib/event_bus/broker/rabbit/queue.rb |
event_bus_rb-2.0.1 | lib/event_bus/broker/rabbit/queue.rb |