lib/sneakers/worker.rb in sneakers-2.5.0 vs lib/sneakers/worker.rb in sneakers-2.6.0

- old
+ new

@@ -17,12 +17,13 @@ queue_name = self.class.queue_name opts = Sneakers::CONFIG.merge(opts) @should_ack = opts[:ack] @timeout_after = opts[:timeout_job_after] - @pool = pool || Thread.pool(opts[:threads]) # XXX config threads + @pool = pool || Concurrent::FixedThreadPool.new(opts[:threads]) @call_with_params = respond_to?(:work_with_params) + @content_type = opts[:content_type] @queue = queue || Sneakers::Queue.new( queue_name, opts ) @@ -37,28 +38,29 @@ def publish(msg, opts) to_queue = opts.delete(:to_queue) opts[:routing_key] ||= to_queue return unless opts[:routing_key] - @queue.exchange.publish(msg, opts) + @queue.exchange.publish(Sneakers::ContentType.serialize(msg, opts[:content_type]), opts) end def do_work(delivery_info, metadata, msg, handler) worker_trace "Working off: #{msg.inspect}" - @pool.process do + @pool.post do res = nil error = nil begin metrics.increment("work.#{self.class.name}.started") Timeout.timeout(@timeout_after, WorkerTimeout) do metrics.timing("work.#{self.class.name}.time") do + deserialized_msg = ContentType.deserialize(msg, @content_type || metadata && metadata[:content_type]) if @call_with_params - res = work_with_params(msg, delivery_info, metadata) + res = work_with_params(deserialized_msg, delivery_info, metadata) else - res = work(msg) + res = work(deserialized_msg) end end end rescue WorkerTimeout => ex res = :timeout @@ -89,18 +91,19 @@ end metrics.increment("work.#{self.class.name}.handled.#{res || 'noop'}") end metrics.increment("work.#{self.class.name}.ended") - end #process + end #post end def stop - worker_trace "Stopping worker: shutting down thread pool." - @pool.shutdown worker_trace "Stopping worker: unsubscribing." @queue.unsubscribe + worker_trace "Stopping worker: shutting down thread pool." + @pool.shutdown + @pool.wait_for_termination worker_trace "Stopping worker: I'm gone." end def run worker_trace "New worker: subscribing." @@ -133,9 +136,10 @@ @queue_opts = opts end def enqueue(msg, opts={}) opts[:routing_key] ||= @queue_opts[:routing_key] + opts[:content_type] ||= @queue_opts[:content_type] opts[:to_queue] ||= @queue_name publisher.publish(msg, opts) end