lib/sneakers/worker.rb in sneakers-0.1.0.pre vs lib/sneakers/worker.rb in sneakers-0.1.1.pre
- old
+ new
@@ -1,8 +1,7 @@
require 'sneakers/queue'
require 'sneakers/support/utils'
-require 'sneakers/support/queue_name'
require 'timeout'
module Sneakers
module Worker
attr_reader :queue, :id
@@ -13,15 +12,12 @@
include Concerns::Metrics
def initialize(queue=nil, pool=nil, opts=nil)
opts = self.class.queue_opts
queue_name = self.class.queue_name
-
opts = Sneakers::Config.merge(opts)
- queue_name = Support::QueueName.new(queue_name, opts).to_s
-
@should_ack = opts[:ack]
@timeout_after = opts[:timeout_job_after]
@pool = pool || Thread.pool(opts[:threads]) # XXX config threads
@call_with_params = respond_to?(:work_with_params)
@@ -38,11 +34,11 @@
def reject!; :reject; end
def requeue!; :requeue; end
def publish(msg, routing)
return unless routing[:to_queue]
- @queue.exchange.publish(msg, :routing_key => Support::QueueName.new(routing[:to_queue], @opts).to_s)
+ @queue.exchange.publish(msg, :routing_key => routing[:to_queue])
end
def do_work(hdr, props, msg, handler)
worker_trace "Working off: #{msg}"
@@ -117,9 +113,19 @@
attr_reader :queue_name
def from_queue(q, opts={})
@queue_name = q.to_s
@queue_opts = opts
+ end
+
+ def enqueue(msg)
+ publisher.publish(msg, :to_queue => @queue_name)
+ end
+
+ private
+
+ def publisher
+ @publisher ||= Sneakers::Publisher.new
end
end
end
end