lib/sneakers/worker.rb in sneakers-0.1.0.pre vs lib/sneakers/worker.rb in sneakers-0.1.1.pre

- old
+ new

@@ -1,8 +1,7 @@ require 'sneakers/queue' require 'sneakers/support/utils' -require 'sneakers/support/queue_name' require 'timeout' module Sneakers module Worker attr_reader :queue, :id @@ -13,15 +12,12 @@ include Concerns::Metrics def initialize(queue=nil, pool=nil, opts=nil) opts = self.class.queue_opts queue_name = self.class.queue_name - opts = Sneakers::Config.merge(opts) - queue_name = Support::QueueName.new(queue_name, opts).to_s - @should_ack = opts[:ack] @timeout_after = opts[:timeout_job_after] @pool = pool || Thread.pool(opts[:threads]) # XXX config threads @call_with_params = respond_to?(:work_with_params) @@ -38,11 +34,11 @@ def reject!; :reject; end def requeue!; :requeue; end def publish(msg, routing) return unless routing[:to_queue] - @queue.exchange.publish(msg, :routing_key => Support::QueueName.new(routing[:to_queue], @opts).to_s) + @queue.exchange.publish(msg, :routing_key => routing[:to_queue]) end def do_work(hdr, props, msg, handler) worker_trace "Working off: #{msg}" @@ -117,9 +113,19 @@ attr_reader :queue_name def from_queue(q, opts={}) @queue_name = q.to_s @queue_opts = opts + end + + def enqueue(msg) + publisher.publish(msg, :to_queue => @queue_name) + end + + private + + def publisher + @publisher ||= Sneakers::Publisher.new end end end end