Sha256: de4f514e14703c6cd83a9008beeb0e5342eaf67c0e7887aa90ace33bc92c959a

Contents?: true

Size: 1.12 KB

Versions: 2

Compression:

Stored size: 1.12 KB

Contents

module EventBus
  module Broker
    class Rabbit::Queue
      def initialize(channel)
        @channel = channel
        @channel.prefetch(1)
      end

      def self.subscribe(channel, routing_key, &block)
        new(channel).subscribe(routing_key, &block)
      end

      def subscribe(routing_key, &block)
        name = queue_name(routing_key)

        channel.queue(name, queue_options)
          .bind(topic, routing_key: routing_key)
          .subscribe(manual_ack: true) do |delivery_info, properties, payload|
            callback(delivery_info, properties, payload, &block)
          end
      end

      private

      attr_reader :channel

      def callback(delivery_info, properties, payload, &block)
        event_name = delivery_info.routing_key

        event = EventBus::Event.new(event_name, payload)

        block.call(event, channel, delivery_info)
      end

      def topic
        Rabbit::Topic.topic(channel)
      end

      def queue_options
        { durable: true }
      end

      def queue_name(routing_key)
        "#{EventBus::Config::APP_NAME.downcase}-#{routing_key.downcase}"
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
event_bus_rb-2.1.0 lib/event_bus/broker/rabbit/queue.rb
event_bus_rb-2.0.3 lib/event_bus/broker/rabbit/queue.rb