lib/sneakers/worker.rb in sneakers-2.6.0 vs lib/sneakers/worker.rb in sneakers-2.7.0
- old
+ new
@@ -17,11 +17,11 @@
queue_name = self.class.queue_name
opts = Sneakers::CONFIG.merge(opts)
@should_ack = opts[:ack]
@timeout_after = opts[:timeout_job_after]
- @pool = pool || Concurrent::FixedThreadPool.new(opts[:threads])
+ @pool = pool || Concurrent::FixedThreadPool.new(opts[:threads] || Sneakers::Configuration::DEFAULTS[:threads])
@call_with_params = respond_to?(:work_with_params)
@content_type = opts[:content_type]
@queue = queue || Sneakers::Queue.new(
queue_name,
@@ -45,57 +45,61 @@
def do_work(delivery_info, metadata, msg, handler)
worker_trace "Working off: #{msg.inspect}"
@pool.post do
- res = nil
- error = nil
+ process_work(delivery_info, metadata, msg, handler)
+ end
+ end
- begin
- metrics.increment("work.#{self.class.name}.started")
- Timeout.timeout(@timeout_after, WorkerTimeout) do
- metrics.timing("work.#{self.class.name}.time") do
- deserialized_msg = ContentType.deserialize(msg, @content_type || metadata && metadata[:content_type])
- if @call_with_params
- res = work_with_params(deserialized_msg, delivery_info, metadata)
- else
- res = work(deserialized_msg)
- end
+ def process_work(delivery_info, metadata, msg, handler)
+ res = nil
+ error = nil
+
+ begin
+ metrics.increment("work.#{self.class.name}.started")
+ Timeout.timeout(@timeout_after, WorkerTimeout) do
+ metrics.timing("work.#{self.class.name}.time") do
+ deserialized_msg = ContentType.deserialize(msg, @content_type || metadata && metadata[:content_type])
+ if @call_with_params
+ res = work_with_params(deserialized_msg, delivery_info, metadata)
+ else
+ res = work(deserialized_msg)
end
end
- rescue WorkerTimeout => ex
- res = :timeout
- worker_error(ex, log_msg: log_msg(msg), class: self.class.name,
- message: msg, delivery_info: delivery_info, metadata: metadata)
- rescue => ex
- res = :error
- error = ex
- worker_error(ex, log_msg: log_msg(msg), class: self.class.name,
- message: msg, delivery_info: delivery_info, metadata: metadata)
end
+ rescue WorkerTimeout => ex
+ res = :timeout
+ worker_error(ex, log_msg: log_msg(msg), class: self.class.name,
+ message: msg, delivery_info: delivery_info, metadata: metadata)
+ rescue => ex
+ res = :error
+ error = ex
+ worker_error(ex, log_msg: log_msg(msg), class: self.class.name,
+ message: msg, delivery_info: delivery_info, metadata: metadata)
+ end
- if @should_ack
+ if @should_ack
- if res == :ack
- # note to future-self. never acknowledge multiple (multiple=true) messages under threads.
- handler.acknowledge(delivery_info, metadata, msg)
- elsif res == :timeout
- handler.timeout(delivery_info, metadata, msg)
- elsif res == :error
- handler.error(delivery_info, metadata, msg, error)
- elsif res == :reject
- handler.reject(delivery_info, metadata, msg)
- elsif res == :requeue
- handler.reject(delivery_info, metadata, msg, true)
- else
- handler.noop(delivery_info, metadata, msg)
- end
- metrics.increment("work.#{self.class.name}.handled.#{res || 'noop'}")
+ if res == :ack
+ # note to future-self. never acknowledge multiple (multiple=true) messages under threads.
+ handler.acknowledge(delivery_info, metadata, msg)
+ elsif res == :timeout
+ handler.timeout(delivery_info, metadata, msg)
+ elsif res == :error
+ handler.error(delivery_info, metadata, msg, error)
+ elsif res == :reject
+ handler.reject(delivery_info, metadata, msg)
+ elsif res == :requeue
+ handler.reject(delivery_info, metadata, msg, true)
+ else
+ handler.noop(delivery_info, metadata, msg)
end
+ metrics.increment("work.#{self.class.name}.handled.#{res || 'noop'}")
+ end
- metrics.increment("work.#{self.class.name}.ended")
- end #post
+ metrics.increment("work.#{self.class.name}.ended")
end
def stop
worker_trace "Stopping worker: unsubscribing."
@queue.unsubscribe
@@ -150,6 +154,5 @@
@publisher ||= Sneakers::Publisher.new(queue_opts)
end
end
end
end
-