lib/sneakers/worker.rb in sneakers-0.1.1.pre vs lib/sneakers/worker.rb in sneakers-1.0.0
- old
+ new
@@ -2,21 +2,21 @@
require 'sneakers/support/utils'
require 'timeout'
module Sneakers
module Worker
- attr_reader :queue, :id
+ attr_reader :queue, :id, :opts
# For now, a worker is hardly dependant on these concerns
# (because it uses methods from them directly.)
include Concerns::Logging
include Concerns::Metrics
- def initialize(queue=nil, pool=nil, opts=nil)
- opts = self.class.queue_opts
+ def initialize(queue = nil, pool = nil, opts = {})
+ opts = opts.merge(self.class.queue_opts || {})
queue_name = self.class.queue_name
- opts = Sneakers::Config.merge(opts)
+ opts = Sneakers::CONFIG.merge(opts)
@should_ack = opts[:ack]
@timeout_after = opts[:timeout_job_after]
@pool = pool || Thread.pool(opts[:threads]) # XXX config threads
@call_with_params = respond_to?(:work_with_params)
@@ -32,16 +32,18 @@
def ack!; :ack end
def reject!; :reject; end
def requeue!; :requeue; end
- def publish(msg, routing)
- return unless routing[:to_queue]
- @queue.exchange.publish(msg, :routing_key => routing[:to_queue])
+ def publish(msg, opts)
+ to_queue = opts.delete(:to_queue)
+ opts[:routing_key] ||= to_queue
+ return unless opts[:routing_key]
+ @queue.exchange.publish(msg, opts)
end
- def do_work(hdr, props, msg, handler)
+ def do_work(delivery_info, metadata, msg, handler)
worker_trace "Working off: #{msg}"
@pool.process do
res = nil
error = nil
@@ -49,41 +51,42 @@
begin
metrics.increment("work.#{self.class.name}.started")
Timeout.timeout(@timeout_after) do
metrics.timing("work.#{self.class.name}.time") do
if @call_with_params
- res = work_with_params(msg, hdr, props)
+ res = work_with_params(msg, delivery_info, metadata)
else
res = work(msg)
end
end
end
rescue Timeout::Error
res = :timeout
- logger.error("timeout")
+ worker_error('timeout')
rescue => ex
res = :error
error = ex
- logger.error(ex)
+ worker_error('unexpected error', ex)
end
if @should_ack
+
if res == :ack
# note to future-self. never acknowledge multiple (multiple=true) messages under threads.
- handler.acknowledge(hdr.delivery_tag)
+ handler.acknowledge(delivery_info, metadata, msg)
elsif res == :timeout
- handler.timeout(hdr.delivery_tag)
+ handler.timeout(delivery_info, metadata, msg)
elsif res == :error
- handler.error(hdr.delivery_tag, error)
+ handler.error(delivery_info, metadata, msg, error)
elsif res == :reject
- handler.reject(hdr.delivery_tag)
+ handler.reject(delivery_info, metadata, msg)
elsif res == :requeue
- handler.reject(hdr.delivery_tag, true)
+ handler.reject(delivery_info, metadata, msg, true)
else
- handler.noop(hdr.delivery_tag)
+ handler.noop(delivery_info, metadata, msg)
end
- metrics.increment("work.#{self.class.name}.handled.#{res || 'reject'}")
+ metrics.increment("work.#{self.class.name}.handled.#{res || 'noop'}")
end
metrics.increment("work.#{self.class.name}.ended")
end #process
end
@@ -98,11 +101,27 @@
worker_trace "New worker: subscribing."
@queue.subscribe(self)
worker_trace "New worker: I'm alive."
end
+ # Construct a log message with some standard prefix for this worker
+ def log_msg(msg)
+ "[#{@id}][#{Thread.current}][#{@queue.name}][#{@queue.opts}] #{msg}"
+ end
+
+ # Helper to log an error message with an optional exception
+ def worker_error(msg, exception = nil)
+ s = log_msg(msg)
+ if exception
+ s += " [Exception error=#{exception.message.inspect} error_class=#{exception.class}"
+ s += " backtrace=#{exception.backtrace.take(50).join(',')}" unless exception.backtrace.nil?
+ s += "]"
+ end
+ logger.error(s)
+ end
+
def worker_trace(msg)
- logger.debug "[#{@id}][#{Thread.current}][#{@queue.name}][#{@queue.opts}] #{msg}"
+ logger.debug(log_msg(msg))
end
def self.included(base)
base.extend ClassMethods
end