lib/rabbit_wq/work.rb in rabbit-wq-1.3.0 vs lib/rabbit_wq/work.rb in rabbit-wq-1.4.0

- old
+ new

@@ -1,10 +1,12 @@ require 'bunny' module RabbitWQ module Work + YAML_MIMETYPE = 'application/yaml' + def self.enqueue( worker, options={} ) payload = worker.to_yaml enqueue_payload( payload, options ) end @@ -12,57 +14,58 @@ delay = options.delete( :delay ) delay = nil if delay && delay < 5000 if delay with_channel do |channel| - delay_x = channel.direct( "#{RabbitWQ.configuration.delayed_exchange_prefix}-#{delay}ms", durable: true ) + delay_x = channel.direct( "#{config.delayed_exchange_prefix}-#{delay}ms", durable: true ) - work_x = channel.send( RabbitWQ.configuration.work_exchange_type, - RabbitWQ.configuration.work_exchange, + work_x = channel.send( config.work_exchange_type, + config.work_exchange, durable: true ) - channel.queue( "#{RabbitWQ.configuration.delayed_queue_prefix}-#{delay}ms", + channel.queue( "#{config.delayed_queue_prefix}-#{delay}ms", durable: true, arguments: { "x-dead-letter-exchange" => work_x.name, "x-message-ttl" => delay } ). bind( delay_x ) delay_x.publish( payload, durable: true, - content_type: 'application/yaml', + content_type: YAML_MIMETYPE, headers: options ) end return end - with_work_exchange do |work_x, work_q| - work_x.publish( payload, durable: true, - content_type: 'application/yaml', - headers: options ) + with_work_exchange do |work_x, work_pub_q, work_sub_q| + work_pub_q.publish( payload, durable: true, + content_type: YAML_MIMETYPE, + headers: options ) end end def self.enqueue_error_payload( payload, options={} ) with_channel do |channel| - error_q = channel.queue( RabbitWQ.configuration.error_queue, durable: true ) + error_q = channel.queue( config.error_queue, durable: true ) error_q.publish( payload, durable: true, - content_type: 'application/yaml', + content_type: YAML_MIMETYPE, headers: options ) end end def self.with_work_exchange with_channel do |channel| begin - exchange = channel.send( RabbitWQ.configuration.work_exchange_type, - RabbitWQ.configuration.work_exchange, + exchange = channel.send( config.work_exchange_type, + config.work_exchange, durable: true ) - channel.queue( RabbitWQ.configuration.work_queue, durable: true ).tap do |q| - q.bind( exchange ) - yield exchange, q - end + work_pub_q = channel.queue( config.work_publish_queue, durable: true ) + work_sub_q = channel.queue( config.work_subscribe_queue, durable: true ) + work_sub_q.bind( exchange ) + + yield exchange, work_pub_q, work_sub_q ensure end end end @@ -77,21 +80,11 @@ b.stop end end end - #def self.with_exchange - #Bunny.new.tap do |b| - #b.start - #begin - #b.create_channel.tap do |c| - #queue = c.queue( 'replication', durable: true ) - #yield c.default_exchange, queue.name - #end - #ensure - #b.stop - #end - #end - #end + def self.config + RabbitWQ.configuration + end end end