lib/sneakers/worker.rb in sneakers-2.6.0 vs lib/sneakers/worker.rb in sneakers-2.7.0

- old
+ new

@@ -17,11 +17,11 @@ queue_name = self.class.queue_name opts = Sneakers::CONFIG.merge(opts) @should_ack = opts[:ack] @timeout_after = opts[:timeout_job_after] - @pool = pool || Concurrent::FixedThreadPool.new(opts[:threads]) + @pool = pool || Concurrent::FixedThreadPool.new(opts[:threads] || Sneakers::Configuration::DEFAULTS[:threads]) @call_with_params = respond_to?(:work_with_params) @content_type = opts[:content_type] @queue = queue || Sneakers::Queue.new( queue_name, @@ -45,57 +45,61 @@ def do_work(delivery_info, metadata, msg, handler) worker_trace "Working off: #{msg.inspect}" @pool.post do - res = nil - error = nil + process_work(delivery_info, metadata, msg, handler) + end + end - begin - metrics.increment("work.#{self.class.name}.started") - Timeout.timeout(@timeout_after, WorkerTimeout) do - metrics.timing("work.#{self.class.name}.time") do - deserialized_msg = ContentType.deserialize(msg, @content_type || metadata && metadata[:content_type]) - if @call_with_params - res = work_with_params(deserialized_msg, delivery_info, metadata) - else - res = work(deserialized_msg) - end + def process_work(delivery_info, metadata, msg, handler) + res = nil + error = nil + + begin + metrics.increment("work.#{self.class.name}.started") + Timeout.timeout(@timeout_after, WorkerTimeout) do + metrics.timing("work.#{self.class.name}.time") do + deserialized_msg = ContentType.deserialize(msg, @content_type || metadata && metadata[:content_type]) + if @call_with_params + res = work_with_params(deserialized_msg, delivery_info, metadata) + else + res = work(deserialized_msg) end end - rescue WorkerTimeout => ex - res = :timeout - worker_error(ex, log_msg: log_msg(msg), class: self.class.name, - message: msg, delivery_info: delivery_info, metadata: metadata) - rescue => ex - res = :error - error = ex - worker_error(ex, log_msg: log_msg(msg), class: self.class.name, - message: msg, delivery_info: delivery_info, metadata: metadata) end + rescue WorkerTimeout => ex + res = :timeout + worker_error(ex, log_msg: log_msg(msg), class: self.class.name, + message: msg, delivery_info: delivery_info, metadata: metadata) + rescue => ex + res = :error + error = ex + worker_error(ex, log_msg: log_msg(msg), class: self.class.name, + message: msg, delivery_info: delivery_info, metadata: metadata) + end - if @should_ack + if @should_ack - if res == :ack - # note to future-self. never acknowledge multiple (multiple=true) messages under threads. - handler.acknowledge(delivery_info, metadata, msg) - elsif res == :timeout - handler.timeout(delivery_info, metadata, msg) - elsif res == :error - handler.error(delivery_info, metadata, msg, error) - elsif res == :reject - handler.reject(delivery_info, metadata, msg) - elsif res == :requeue - handler.reject(delivery_info, metadata, msg, true) - else - handler.noop(delivery_info, metadata, msg) - end - metrics.increment("work.#{self.class.name}.handled.#{res || 'noop'}") + if res == :ack + # note to future-self. never acknowledge multiple (multiple=true) messages under threads. + handler.acknowledge(delivery_info, metadata, msg) + elsif res == :timeout + handler.timeout(delivery_info, metadata, msg) + elsif res == :error + handler.error(delivery_info, metadata, msg, error) + elsif res == :reject + handler.reject(delivery_info, metadata, msg) + elsif res == :requeue + handler.reject(delivery_info, metadata, msg, true) + else + handler.noop(delivery_info, metadata, msg) end + metrics.increment("work.#{self.class.name}.handled.#{res || 'noop'}") + end - metrics.increment("work.#{self.class.name}.ended") - end #post + metrics.increment("work.#{self.class.name}.ended") end def stop worker_trace "Stopping worker: unsubscribing." @queue.unsubscribe @@ -150,6 +154,5 @@ @publisher ||= Sneakers::Publisher.new(queue_opts) end end end end -