lib/rabbit_wq/work.rb in rabbit-wq-0.2.0 vs lib/rabbit_wq/work.rb in rabbit-wq-0.3.0

- old
+ new

@@ -10,39 +10,82 @@ def self.enqueue_payload( payload, options={} ) delay = options.delete( :delay ) delay = nil if delay && delay < 5000 - mq = ::Bunny.new.tap { |bunny| bunny.start } - channel = mq.create_channel - if delay - delay_x = channel.direct( "#{DELAY_EXCHANGE_PREFIX}-#{delay}ms", durable: true ) - work_x = channel.direct( WORK_EXCHANGE, durable: true ) + with_channel do |channel| + delay_x = channel.direct( "#{RabbitWQ.configuration.delayed_exchange_prefix}-#{delay}ms", durable: true ) + work_x = channel.direct( RabbitWQ.configuration.work_exchange, durable: true ) - channel.queue( "#{DELAY_QUEUE_PREFIX}-#{delay}ms", - durable: true, - arguments: { "x-dead-letter-exchange" => work_x.name, - "x-message-ttl" => delay } ). - bind( delay_x ) + channel.queue( "#{RabbitWQ.configuration.delayed_queue_prefix}-#{delay}ms", + durable: true, + arguments: { "x-dead-letter-exchange" => work_x.name, + "x-message-ttl" => delay } ). + bind( delay_x ) - delay_x.publish( payload, durable: true, - headers: options ) + delay_x.publish( payload, durable: true, + content_type: 'application/yaml', + headers: options ) + end + return end - work_q = channel.queue( QUEUE, durable: true ) - work_q.publish( payload, durable: true, - headers: options ) + with_work_exchange do |work_x, work_q| + work_x.publish( payload, durable: true, + content_type: 'application/yaml', + headers: options ) + end end def self.enqueue_error_payload( payload, options={} ) - mq = ::Bunny.new.tap { |bunny| bunny.start } - channel = mq.create_channel + with_channel do |channel| + error_q = channel.queue( RabbitWQ.configuration.error_queue, durable: true ) + error_q.publish( payload, durable: true, + content_type: 'application/yaml', + headers: options ) + end + end - error_q = channel.queue( ERROR_QUEUE, durable: true ) - error_q.publish( payload, durable: true, - headers: options ) + def self.with_work_exchange + with_channel do |channel| + begin + exchange = channel.direct( RabbitWQ.configuration.work_exchange, durable: true ) + channel.queue( RabbitWQ.configuration.work_queue, durable: true ).tap do |q| + q.bind( exchange ) + yield exchange, q + end + ensure + end + end end + + def self.with_channel + Bunny.new.tap do |b| + b.start + begin + b.create_channel.tap do |c| + yield c + end + ensure + b.stop + end + end + end + + #def self.with_exchange + #Bunny.new.tap do |b| + #b.start + #begin + #b.create_channel.tap do |c| + #queue = c.queue( 'replication', durable: true ) + #yield c.default_exchange, queue.name + #end + #ensure + #b.stop + #end + #end + #end end end