lib/sneakers/worker.rb in sneakers-0.0.5 vs lib/sneakers/worker.rb in sneakers-0.0.6

- old
+ new

@@ -15,32 +15,29 @@ def initialize(queue=nil, pool=nil, opts=nil) opts = self.class.queue_opts queue_name = self.class.queue_name opts = Sneakers::Config.merge(opts) + queue_name = Support::QueueName.new(queue_name, opts).to_s @should_ack = opts[:ack] @timeout_after = opts[:timeout_job_after] @pool = pool || Thread.pool(opts[:threads]) # XXX config threads @queue = queue || Sneakers::Queue.new( queue_name, - :prefetch => opts[:prefetch], - :durable => opts[:durable], - :ack => @should_ack, - :heartbeat_interval => opts[:heartbeat_interval], - :exchange => opts[:exchange] + opts ) @opts = opts @id = Utils.make_worker_id(queue_name) end def ack!; :ack end - def nack!; :nack end def reject!; :reject; end + def requeue!; :requeue; end def publish(msg, routing) return unless routing[:to_queue] @queue.exchange.publish(msg, :routing_key => QueueName.new(routing[:to_queue], @opts).to_s) end @@ -74,11 +71,15 @@ handler.acknowledge(hdr.delivery_tag) elsif res == :timeout handler.timeout(hdr.delivery_tag) elsif res == :error handler.error(hdr.delivery_tag, error) - else + elsif res == :reject handler.reject(hdr.delivery_tag) + elsif res == :requeue + handler.reject(hdr.delivery_tag, true) + else + handler.noop(hdr.delivery_tag) end metrics.increment("work.#{self.class.name}.handled.#{res || 'reject'}") end metrics.increment("work.#{self.class.name}.ended")