lib/sneakers/worker.rb in sneakers-0.0.7 vs lib/sneakers/worker.rb in sneakers-0.1.0.pre

- old
+ new

@@ -21,10 +21,11 @@ queue_name = Support::QueueName.new(queue_name, opts).to_s @should_ack = opts[:ack] @timeout_after = opts[:timeout_job_after] @pool = pool || Thread.pool(opts[:threads]) # XXX config threads + @call_with_params = respond_to?(:work_with_params) @queue = queue || Sneakers::Queue.new( queue_name, opts ) @@ -37,11 +38,11 @@ def reject!; :reject; end def requeue!; :requeue; end def publish(msg, routing) return unless routing[:to_queue] - @queue.exchange.publish(msg, :routing_key => QueueName.new(routing[:to_queue], @opts).to_s) + @queue.exchange.publish(msg, :routing_key => Support::QueueName.new(routing[:to_queue], @opts).to_s) end def do_work(hdr, props, msg, handler) worker_trace "Working off: #{msg}" @@ -51,10 +52,14 @@ begin metrics.increment("work.#{self.class.name}.started") Timeout.timeout(@timeout_after) do metrics.timing("work.#{self.class.name}.time") do - res = work(msg) + if @call_with_params + res = work_with_params(msg, hdr, props) + else + res = work(msg) + end end end rescue Timeout::Error res = :timeout logger.error("timeout")