Sha256: 04bac838b96d0f6f4eacd94fb38ab86eceaabaa7c01c58c5cde0883fbf059b2c

Contents?: true

Size: 1.36 KB

Versions: 2

Compression:

Stored size: 1.36 KB

Contents

# frozen_string_literal: true

require 'forwardable'
require 'bunny'

module CottonTail
  module Queue
    # A wrapper around a ::Bunny::Queue that makes it interchangeable with a
    # standard Ruby Queue
    class Bunny < SimpleDelegator
      extend Forwardable

      def self.call(**opts)
        new(**opts)
      end

      def initialize(name:, connection:, manual_ack: false, **opts)
        super ::Queue.new

        @name = name
        @source_opts = opts
        @connection = connection

        watch_source manual_ack
      end

      def push(request)
        bind request.routing_key
        exchange.publish request.payload, routing_key: request.routing_key
      end

      def pop
        delivery_info, properties, payload = super
        Request.new(delivery_info, MessageProperties.new(properties.to_h), payload)
      end

      def bind(routing_key)
        source.bind('amq.topic', routing_key: Route.new(routing_key).binding)
      end

      private

      attr_reader :connection

      def watch_source(manual_ack)
        source.subscribe(manual_ack: manual_ack) { |*args| self << args }
      end

      def source
        @source ||= channel.queue(@name, **@source_opts)
      end

      def channel
        @channel ||= connection.create_channel
      end

      def exchange
        @exchange ||= channel.exchange('amq.topic')
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
cotton-tail-0.6.1 lib/cotton_tail/queue/bunny.rb
cotton-tail-0.6.0 lib/cotton_tail/queue/bunny.rb