Sha256: 6a0e3d45bb40c45532e3a73d2d46c768c9f08c530e6fd69815535c4212ebd88e

Contents?: true

Size: 1.13 KB

Versions: 2

Compression:

Stored size: 1.13 KB

Contents

module EventBus
  module Broker
    class Rabbit::Queue
      def initialize(connection)
        @channel = connection
        @channel.prefetch(1)
      end

      def self.subscribe(connection, routing_key, &block)
        new(connection).subscribe(routing_key, &block)
      end

      def subscribe(routing_key, &block)
        name = queue_name(routing_key)

        channel.queue(name, queue_options)
          .bind(topic, routing_key: routing_key)
          .subscribe(manual_ack: true) do |delivery_info, properties, payload|
            callback(delivery_info, properties, payload, &block)
          end
      end

      private

      attr_reader :channel

      def callback(delivery_info, properties, payload, &block)
        event_name = delivery_info.routing_key

        event = EventBus::Event.new(event_name, payload)

        block.call(event, channel, delivery_info)
      end

      def topic
        Rabbit::Topic.topic(channel)
      end

      def queue_options
        { durable: true }
      end

      def queue_name(routing_key)
        "#{EventBus::Config::APP_NAME.downcase}-#{routing_key.downcase}"
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
event_bus_rb-2.0.2 lib/event_bus/broker/rabbit/queue.rb
event_bus_rb-2.0.1 lib/event_bus/broker/rabbit/queue.rb