lib/rabbit_wq/work.rb in rabbit-wq-0.0.1 vs lib/rabbit_wq/work.rb in rabbit-wq-0.1.0

- old
+ new

@@ -2,29 +2,39 @@ module RabbitWQ module Work def self.enqueue( worker, options={} ) - options[:delay] = nil if options[:delay] && options[:delay] < 5000 + payload = worker.to_yaml + enqueue_payload( payload, options ) + end + def self.enqueue_payload( payload, options={} ) + delay = options.delete( :delay ) + delay = nil if delay && delay < 5000 + mq = ::Bunny.new.tap { |bunny| bunny.start } channel = mq.create_channel - if options[:delay] - delay_x = channel.direct( "#{DELAY_EXCHANGE_PREFIX}-#{options[:delay]}ms", durable: true ) - work_x = channel.direct( WORK_EXCHANGE, durable: true ) - queue = channel.queue( "#{DELAY_QUEUE_PREFIX}-#{options[:delay]}ms", - durable: true, - arguments: { "x-dead-letter-exchange" => work_x.name, - "x-message-ttl" => options[:delay] } ). - bind( delay_x ) + if delay + delay_x = channel.direct( "#{DELAY_EXCHANGE_PREFIX}-#{delay}ms", durable: true ) + work_x = channel.direct( WORK_EXCHANGE, durable: true ) - delay_x.publish( 'hello', durable: true ) + channel.queue( "#{DELAY_QUEUE_PREFIX}-#{delay}ms", + durable: true, + arguments: { "x-dead-letter-exchange" => work_x.name, + "x-message-ttl" => delay } ). + bind( delay_x ) + + delay_x.publish( payload, durable: true, + headers: options ) return end work_q = channel.queue( QUEUE, durable: true ) - work_q.publish( 'hello', durable: true ) + work_q.publish( payload, durable: true, + headers: options ) end + end end