lib/sneakers/worker.rb in sneakers-2.5.0 vs lib/sneakers/worker.rb in sneakers-2.6.0
- old
+ new
@@ -17,12 +17,13 @@
queue_name = self.class.queue_name
opts = Sneakers::CONFIG.merge(opts)
@should_ack = opts[:ack]
@timeout_after = opts[:timeout_job_after]
- @pool = pool || Thread.pool(opts[:threads]) # XXX config threads
+ @pool = pool || Concurrent::FixedThreadPool.new(opts[:threads])
@call_with_params = respond_to?(:work_with_params)
+ @content_type = opts[:content_type]
@queue = queue || Sneakers::Queue.new(
queue_name,
opts
)
@@ -37,28 +38,29 @@
def publish(msg, opts)
to_queue = opts.delete(:to_queue)
opts[:routing_key] ||= to_queue
return unless opts[:routing_key]
- @queue.exchange.publish(msg, opts)
+ @queue.exchange.publish(Sneakers::ContentType.serialize(msg, opts[:content_type]), opts)
end
def do_work(delivery_info, metadata, msg, handler)
worker_trace "Working off: #{msg.inspect}"
- @pool.process do
+ @pool.post do
res = nil
error = nil
begin
metrics.increment("work.#{self.class.name}.started")
Timeout.timeout(@timeout_after, WorkerTimeout) do
metrics.timing("work.#{self.class.name}.time") do
+ deserialized_msg = ContentType.deserialize(msg, @content_type || metadata && metadata[:content_type])
if @call_with_params
- res = work_with_params(msg, delivery_info, metadata)
+ res = work_with_params(deserialized_msg, delivery_info, metadata)
else
- res = work(msg)
+ res = work(deserialized_msg)
end
end
end
rescue WorkerTimeout => ex
res = :timeout
@@ -89,18 +91,19 @@
end
metrics.increment("work.#{self.class.name}.handled.#{res || 'noop'}")
end
metrics.increment("work.#{self.class.name}.ended")
- end #process
+ end #post
end
def stop
- worker_trace "Stopping worker: shutting down thread pool."
- @pool.shutdown
worker_trace "Stopping worker: unsubscribing."
@queue.unsubscribe
+ worker_trace "Stopping worker: shutting down thread pool."
+ @pool.shutdown
+ @pool.wait_for_termination
worker_trace "Stopping worker: I'm gone."
end
def run
worker_trace "New worker: subscribing."
@@ -133,9 +136,10 @@
@queue_opts = opts
end
def enqueue(msg, opts={})
opts[:routing_key] ||= @queue_opts[:routing_key]
+ opts[:content_type] ||= @queue_opts[:content_type]
opts[:to_queue] ||= @queue_name
publisher.publish(msg, opts)
end