lib/sneakers/worker.rb in sneakers-0.0.7 vs lib/sneakers/worker.rb in sneakers-0.1.0.pre
- old
+ new
@@ -21,10 +21,11 @@
queue_name = Support::QueueName.new(queue_name, opts).to_s
@should_ack = opts[:ack]
@timeout_after = opts[:timeout_job_after]
@pool = pool || Thread.pool(opts[:threads]) # XXX config threads
+ @call_with_params = respond_to?(:work_with_params)
@queue = queue || Sneakers::Queue.new(
queue_name,
opts
)
@@ -37,11 +38,11 @@
def reject!; :reject; end
def requeue!; :requeue; end
def publish(msg, routing)
return unless routing[:to_queue]
- @queue.exchange.publish(msg, :routing_key => QueueName.new(routing[:to_queue], @opts).to_s)
+ @queue.exchange.publish(msg, :routing_key => Support::QueueName.new(routing[:to_queue], @opts).to_s)
end
def do_work(hdr, props, msg, handler)
worker_trace "Working off: #{msg}"
@@ -51,10 +52,14 @@
begin
metrics.increment("work.#{self.class.name}.started")
Timeout.timeout(@timeout_after) do
metrics.timing("work.#{self.class.name}.time") do
- res = work(msg)
+ if @call_with_params
+ res = work_with_params(msg, hdr, props)
+ else
+ res = work(msg)
+ end
end
end
rescue Timeout::Error
res = :timeout
logger.error("timeout")