lib/cotton_tail/queue/bunny.rb in cotton-tail-0.1.2 vs lib/cotton_tail/queue/bunny.rb in cotton-tail-0.2.0
- old
+ new
@@ -1,48 +1,66 @@
# frozen_string_literal: true
+require 'forwardable'
+require 'bunny'
+
module CottonTail
module Queue
# A wrapper around a ::Bunny::Queue that makes it interchangeable with a
- # standard Ruby::Queue
- class Bunny
+ # standard Ruby Queue
+ class Bunny < SimpleDelegator
+ extend Forwardable
+
def self.call(name:, **opts)
new(name, **opts)
end
- def initialize(name, conn: Connection.new, prefetch: 1, manual_ack: false, **opts)
+ def initialize(name, manual_ack: false, **opts)
+ super ::Queue.new
+
@name = name
- @prefetch = prefetch
- @conn = conn
- @queue = conn.queue(@name, **opts)
- @messages = ::Queue.new
- @queue.subscribe(manual_ack: manual_ack) { |*args| @messages << args }
- end
+ @source_opts = opts
- def bind(routing_key)
- @queue.bind('amq.topic', routing_key: routing_key)
+ watch_source manual_ack
end
def push(args)
routing_key, message = args
- @conn.publish message, routing_key: routing_key
+ bind routing_key
+ exchange.publish message, routing_key: routing_key
end
- def close
- @messages.close
+ def pop
+ delivery_info, *tail = super
+ [delivery_info[:routing_key], delivery_info, *tail]
end
- def closed?
- @messages.closed?
+ private
+
+ def_delegator :'CottonTail.configuration', :connection_args
+
+ def bind(routing_key)
+ source.bind('amq.topic', routing_key: routing_key)
end
- def empty?
- @messages.empty?
+ def watch_source(manual_ack)
+ source.subscribe(manual_ack: manual_ack) { |*args| self << args }
end
- def pop
- delivery_info, *tail = @messages.pop
- [delivery_info[:routing_key], delivery_info, *tail, conn: @conn]
+ def connection
+ @connection ||= ::Bunny.new(*connection_args).start
+ end
+
+ def source
+ @source ||= channel.queue(@name, **@source_opts)
+ end
+
+ def channel
+ @channel ||= connection.create_channel
+ end
+
+ def exchange
+ @exchange ||= channel.exchange('amq.topic')
end
end
end
end