lib/sneakers/worker.rb in sneakers-0.1.1.pre vs lib/sneakers/worker.rb in sneakers-1.0.0

- old
+ new

@@ -2,21 +2,21 @@ require 'sneakers/support/utils' require 'timeout' module Sneakers module Worker - attr_reader :queue, :id + attr_reader :queue, :id, :opts # For now, a worker is hardly dependant on these concerns # (because it uses methods from them directly.) include Concerns::Logging include Concerns::Metrics - def initialize(queue=nil, pool=nil, opts=nil) - opts = self.class.queue_opts + def initialize(queue = nil, pool = nil, opts = {}) + opts = opts.merge(self.class.queue_opts || {}) queue_name = self.class.queue_name - opts = Sneakers::Config.merge(opts) + opts = Sneakers::CONFIG.merge(opts) @should_ack = opts[:ack] @timeout_after = opts[:timeout_job_after] @pool = pool || Thread.pool(opts[:threads]) # XXX config threads @call_with_params = respond_to?(:work_with_params) @@ -32,16 +32,18 @@ def ack!; :ack end def reject!; :reject; end def requeue!; :requeue; end - def publish(msg, routing) - return unless routing[:to_queue] - @queue.exchange.publish(msg, :routing_key => routing[:to_queue]) + def publish(msg, opts) + to_queue = opts.delete(:to_queue) + opts[:routing_key] ||= to_queue + return unless opts[:routing_key] + @queue.exchange.publish(msg, opts) end - def do_work(hdr, props, msg, handler) + def do_work(delivery_info, metadata, msg, handler) worker_trace "Working off: #{msg}" @pool.process do res = nil error = nil @@ -49,41 +51,42 @@ begin metrics.increment("work.#{self.class.name}.started") Timeout.timeout(@timeout_after) do metrics.timing("work.#{self.class.name}.time") do if @call_with_params - res = work_with_params(msg, hdr, props) + res = work_with_params(msg, delivery_info, metadata) else res = work(msg) end end end rescue Timeout::Error res = :timeout - logger.error("timeout") + worker_error('timeout') rescue => ex res = :error error = ex - logger.error(ex) + worker_error('unexpected error', ex) end if @should_ack + if res == :ack # note to future-self. never acknowledge multiple (multiple=true) messages under threads. - handler.acknowledge(hdr.delivery_tag) + handler.acknowledge(delivery_info, metadata, msg) elsif res == :timeout - handler.timeout(hdr.delivery_tag) + handler.timeout(delivery_info, metadata, msg) elsif res == :error - handler.error(hdr.delivery_tag, error) + handler.error(delivery_info, metadata, msg, error) elsif res == :reject - handler.reject(hdr.delivery_tag) + handler.reject(delivery_info, metadata, msg) elsif res == :requeue - handler.reject(hdr.delivery_tag, true) + handler.reject(delivery_info, metadata, msg, true) else - handler.noop(hdr.delivery_tag) + handler.noop(delivery_info, metadata, msg) end - metrics.increment("work.#{self.class.name}.handled.#{res || 'reject'}") + metrics.increment("work.#{self.class.name}.handled.#{res || 'noop'}") end metrics.increment("work.#{self.class.name}.ended") end #process end @@ -98,11 +101,27 @@ worker_trace "New worker: subscribing." @queue.subscribe(self) worker_trace "New worker: I'm alive." end + # Construct a log message with some standard prefix for this worker + def log_msg(msg) + "[#{@id}][#{Thread.current}][#{@queue.name}][#{@queue.opts}] #{msg}" + end + + # Helper to log an error message with an optional exception + def worker_error(msg, exception = nil) + s = log_msg(msg) + if exception + s += " [Exception error=#{exception.message.inspect} error_class=#{exception.class}" + s += " backtrace=#{exception.backtrace.take(50).join(',')}" unless exception.backtrace.nil? + s += "]" + end + logger.error(s) + end + def worker_trace(msg) - logger.debug "[#{@id}][#{Thread.current}][#{@queue.name}][#{@queue.opts}] #{msg}" + logger.debug(log_msg(msg)) end def self.included(base) base.extend ClassMethods end