lib/sneakers/worker.rb in sneakers-2.3.5 vs lib/sneakers/worker.rb in sneakers-2.4.0
- old
+ new
@@ -8,10 +8,11 @@
# For now, a worker is hardly dependant on these concerns
# (because it uses methods from them directly.)
include Concerns::Logging
include Concerns::Metrics
+ include Sneakers::ErrorReporter
def initialize(queue = nil, pool = nil, opts = {})
opts = opts.merge(self.class.queue_opts || {})
queue_name = self.class.queue_name
opts = Sneakers::CONFIG.merge(opts)
@@ -57,17 +58,17 @@
else
res = work(msg)
end
end
end
- rescue Timeout::Error
+ rescue Timeout::Error => ex
res = :timeout
- worker_error('timeout')
+ worker_error(ex, log_msg: log_msg(msg), message: msg)
rescue => ex
res = :error
error = ex
- worker_error('unexpected error', ex)
+ worker_error(ex, log_msg: log_msg(msg), message: msg)
end
if @should_ack
if res == :ack
@@ -90,10 +91,12 @@
metrics.increment("work.#{self.class.name}.ended")
end #process
end
def stop
+ worker_trace "Stopping worker: shutting down thread pool."
+ @pool.shutdown
worker_trace "Stopping worker: unsubscribing."
@queue.unsubscribe
worker_trace "Stopping worker: I'm gone."
end
@@ -106,21 +109,10 @@
# Construct a log message with some standard prefix for this worker
def log_msg(msg)
"[#{@id}][#{Thread.current}][#{@queue.name}][#{@queue.opts}] #{msg}"
end
- # Helper to log an error message with an optional exception
- def worker_error(msg, exception = nil)
- s = log_msg(msg)
- if exception
- s += " [Exception error=#{exception.message.inspect} error_class=#{exception.class}"
- s += " backtrace=#{exception.backtrace.take(50).join(',')}" unless exception.backtrace.nil?
- s += "]"
- end
- logger.error(s)
- end
-
def worker_trace(msg)
logger.debug(log_msg(msg))
end
Classes = []
@@ -144,10 +136,10 @@
end
private
def publisher
- @publisher ||= Sneakers::Publisher.new
+ @publisher ||= Sneakers::Publisher.new(queue_opts)
end
end
end
end