lib/async_observer/queue.rb in beanstalker-0.0.1 vs lib/async_observer/queue.rb in beanstalker-0.1.1
- old
+ new
@@ -27,11 +27,11 @@
DEFAULT_FUZZ = 0
DEFAULT_DELAY = 0
DEFAULT_TTR = 120
DEFAULT_TUBE = 'default'
- attr_accessor :queue, :app_version, :after_put
+ attr_accessor :queue, :after_put
# This is a fake worker instance for running jobs synchronously.
def sync_worker()
require 'async_observer/worker'
@sync_worker ||= AsyncObserver::Worker.new(binding)
@@ -46,11 +46,11 @@
return 0, '0.0.0.0'
end
def put!(obj, pri=DEFAULT_PRI, delay=DEFAULT_DELAY, ttr=DEFAULT_TTR,
tube=DEFAULT_TUBE)
- return sync_run(obj) if (:direct.equal?(pri) or !queue)
+ return sync_run(obj) if pri == :direct || !queue
queue.connect()
queue.use(tube)
info = [queue.yput(obj, pri, delay, ttr), queue.last_server]
f = AsyncObserver::Queue.after_put
f.call(*info) if f
@@ -62,17 +62,24 @@
def put_call!(obj, sel, opts, args=[])
pri = opts.fetch(:pri, DEFAULT_PRI)
fuzz = opts.fetch(:fuzz, DEFAULT_FUZZ)
delay = opts.fetch(:delay, DEFAULT_DELAY)
ttr = opts.fetch(:ttr, DEFAULT_TTR)
- tube = opts.fetch(:tube, (app_version or DEFAULT_TUBE))
+ tube = opts.fetch(:tube, DEFAULT_TUBE)
worker_opts = opts.reject{|k,v| SUBMIT_OPTS.include?(k)}
+ interpolator = opts.fetch(:interpolator, nil)
- pri = pri + rand(fuzz + 1) if !:direct.equal?(pri)
+ pri = pri + rand(fuzz + 1) if pri != :direct
- code = gen(obj, sel, args)
- RAILS_DEFAULT_LOGGER.info("put #{pri} #{code}")
- put!(pkg(code, worker_opts), pri, delay, ttr, tube)
+ if interpolator
+ code = packed = interpolator
+ else
+ code = gen(obj, sel, args)
+ packed = pkg(code, worker_opts)
+ end
+
+ RAILS_DEFAULT_LOGGER.info("put #{pri} #{code} to #{tube}")
+ put!(packed, pri, delay, ttr, tube)
end
def pkg(code, opts)
opts.merge(:type => :rails, :code => code)
end