lib/cotton_tail/queue/bunny.rb in cotton-tail-0.1.2 vs lib/cotton_tail/queue/bunny.rb in cotton-tail-0.2.0

- old
+ new

@@ -1,48 +1,66 @@ # frozen_string_literal: true +require 'forwardable' +require 'bunny' + module CottonTail module Queue # A wrapper around a ::Bunny::Queue that makes it interchangeable with a - # standard Ruby::Queue - class Bunny + # standard Ruby Queue + class Bunny < SimpleDelegator + extend Forwardable + def self.call(name:, **opts) new(name, **opts) end - def initialize(name, conn: Connection.new, prefetch: 1, manual_ack: false, **opts) + def initialize(name, manual_ack: false, **opts) + super ::Queue.new + @name = name - @prefetch = prefetch - @conn = conn - @queue = conn.queue(@name, **opts) - @messages = ::Queue.new - @queue.subscribe(manual_ack: manual_ack) { |*args| @messages << args } - end + @source_opts = opts - def bind(routing_key) - @queue.bind('amq.topic', routing_key: routing_key) + watch_source manual_ack end def push(args) routing_key, message = args - @conn.publish message, routing_key: routing_key + bind routing_key + exchange.publish message, routing_key: routing_key end - def close - @messages.close + def pop + delivery_info, *tail = super + [delivery_info[:routing_key], delivery_info, *tail] end - def closed? - @messages.closed? + private + + def_delegator :'CottonTail.configuration', :connection_args + + def bind(routing_key) + source.bind('amq.topic', routing_key: routing_key) end - def empty? - @messages.empty? + def watch_source(manual_ack) + source.subscribe(manual_ack: manual_ack) { |*args| self << args } end - def pop - delivery_info, *tail = @messages.pop - [delivery_info[:routing_key], delivery_info, *tail, conn: @conn] + def connection + @connection ||= ::Bunny.new(*connection_args).start + end + + def source + @source ||= channel.queue(@name, **@source_opts) + end + + def channel + @channel ||= connection.create_channel + end + + def exchange + @exchange ||= channel.exchange('amq.topic') end end end end