Sha256: 04bac838b96d0f6f4eacd94fb38ab86eceaabaa7c01c58c5cde0883fbf059b2c
Contents?: true
Size: 1.36 KB
Versions: 2
Compression:
Stored size: 1.36 KB
Contents
# frozen_string_literal: true require 'forwardable' require 'bunny' module CottonTail module Queue # A wrapper around a ::Bunny::Queue that makes it interchangeable with a # standard Ruby Queue class Bunny < SimpleDelegator extend Forwardable def self.call(**opts) new(**opts) end def initialize(name:, connection:, manual_ack: false, **opts) super ::Queue.new @name = name @source_opts = opts @connection = connection watch_source manual_ack end def push(request) bind request.routing_key exchange.publish request.payload, routing_key: request.routing_key end def pop delivery_info, properties, payload = super Request.new(delivery_info, MessageProperties.new(properties.to_h), payload) end def bind(routing_key) source.bind('amq.topic', routing_key: Route.new(routing_key).binding) end private attr_reader :connection def watch_source(manual_ack) source.subscribe(manual_ack: manual_ack) { |*args| self << args } end def source @source ||= channel.queue(@name, **@source_opts) end def channel @channel ||= connection.create_channel end def exchange @exchange ||= channel.exchange('amq.topic') end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
cotton-tail-0.6.1 | lib/cotton_tail/queue/bunny.rb |
cotton-tail-0.6.0 | lib/cotton_tail/queue/bunny.rb |