lib/sneakers/worker.rb in sneakers-0.0.5 vs lib/sneakers/worker.rb in sneakers-0.0.6
- old
+ new
@@ -15,32 +15,29 @@
def initialize(queue=nil, pool=nil, opts=nil)
opts = self.class.queue_opts
queue_name = self.class.queue_name
opts = Sneakers::Config.merge(opts)
+
queue_name = Support::QueueName.new(queue_name, opts).to_s
@should_ack = opts[:ack]
@timeout_after = opts[:timeout_job_after]
@pool = pool || Thread.pool(opts[:threads]) # XXX config threads
@queue = queue || Sneakers::Queue.new(
queue_name,
- :prefetch => opts[:prefetch],
- :durable => opts[:durable],
- :ack => @should_ack,
- :heartbeat_interval => opts[:heartbeat_interval],
- :exchange => opts[:exchange]
+ opts
)
@opts = opts
@id = Utils.make_worker_id(queue_name)
end
def ack!; :ack end
- def nack!; :nack end
def reject!; :reject; end
+ def requeue!; :requeue; end
def publish(msg, routing)
return unless routing[:to_queue]
@queue.exchange.publish(msg, :routing_key => QueueName.new(routing[:to_queue], @opts).to_s)
end
@@ -74,11 +71,15 @@
handler.acknowledge(hdr.delivery_tag)
elsif res == :timeout
handler.timeout(hdr.delivery_tag)
elsif res == :error
handler.error(hdr.delivery_tag, error)
- else
+ elsif res == :reject
handler.reject(hdr.delivery_tag)
+ elsif res == :requeue
+ handler.reject(hdr.delivery_tag, true)
+ else
+ handler.noop(hdr.delivery_tag)
end
metrics.increment("work.#{self.class.name}.handled.#{res || 'reject'}")
end
metrics.increment("work.#{self.class.name}.ended")