lib/sneakers/worker.rb in sneakers-2.3.5 vs lib/sneakers/worker.rb in sneakers-2.4.0

- old
+ new

@@ -8,10 +8,11 @@ # For now, a worker is hardly dependant on these concerns # (because it uses methods from them directly.) include Concerns::Logging include Concerns::Metrics + include Sneakers::ErrorReporter def initialize(queue = nil, pool = nil, opts = {}) opts = opts.merge(self.class.queue_opts || {}) queue_name = self.class.queue_name opts = Sneakers::CONFIG.merge(opts) @@ -57,17 +58,17 @@ else res = work(msg) end end end - rescue Timeout::Error + rescue Timeout::Error => ex res = :timeout - worker_error('timeout') + worker_error(ex, log_msg: log_msg(msg), message: msg) rescue => ex res = :error error = ex - worker_error('unexpected error', ex) + worker_error(ex, log_msg: log_msg(msg), message: msg) end if @should_ack if res == :ack @@ -90,10 +91,12 @@ metrics.increment("work.#{self.class.name}.ended") end #process end def stop + worker_trace "Stopping worker: shutting down thread pool." + @pool.shutdown worker_trace "Stopping worker: unsubscribing." @queue.unsubscribe worker_trace "Stopping worker: I'm gone." end @@ -106,21 +109,10 @@ # Construct a log message with some standard prefix for this worker def log_msg(msg) "[#{@id}][#{Thread.current}][#{@queue.name}][#{@queue.opts}] #{msg}" end - # Helper to log an error message with an optional exception - def worker_error(msg, exception = nil) - s = log_msg(msg) - if exception - s += " [Exception error=#{exception.message.inspect} error_class=#{exception.class}" - s += " backtrace=#{exception.backtrace.take(50).join(',')}" unless exception.backtrace.nil? - s += "]" - end - logger.error(s) - end - def worker_trace(msg) logger.debug(log_msg(msg)) end Classes = [] @@ -144,10 +136,10 @@ end private def publisher - @publisher ||= Sneakers::Publisher.new + @publisher ||= Sneakers::Publisher.new(queue_opts) end end end end