lib/async_observer/queue.rb in beanstalker-0.0.1 vs lib/async_observer/queue.rb in beanstalker-0.1.1

- old
+ new

@@ -27,11 +27,11 @@ DEFAULT_FUZZ = 0 DEFAULT_DELAY = 0 DEFAULT_TTR = 120 DEFAULT_TUBE = 'default' - attr_accessor :queue, :app_version, :after_put + attr_accessor :queue, :after_put # This is a fake worker instance for running jobs synchronously. def sync_worker() require 'async_observer/worker' @sync_worker ||= AsyncObserver::Worker.new(binding) @@ -46,11 +46,11 @@ return 0, '0.0.0.0' end def put!(obj, pri=DEFAULT_PRI, delay=DEFAULT_DELAY, ttr=DEFAULT_TTR, tube=DEFAULT_TUBE) - return sync_run(obj) if (:direct.equal?(pri) or !queue) + return sync_run(obj) if pri == :direct || !queue queue.connect() queue.use(tube) info = [queue.yput(obj, pri, delay, ttr), queue.last_server] f = AsyncObserver::Queue.after_put f.call(*info) if f @@ -62,17 +62,24 @@ def put_call!(obj, sel, opts, args=[]) pri = opts.fetch(:pri, DEFAULT_PRI) fuzz = opts.fetch(:fuzz, DEFAULT_FUZZ) delay = opts.fetch(:delay, DEFAULT_DELAY) ttr = opts.fetch(:ttr, DEFAULT_TTR) - tube = opts.fetch(:tube, (app_version or DEFAULT_TUBE)) + tube = opts.fetch(:tube, DEFAULT_TUBE) worker_opts = opts.reject{|k,v| SUBMIT_OPTS.include?(k)} + interpolator = opts.fetch(:interpolator, nil) - pri = pri + rand(fuzz + 1) if !:direct.equal?(pri) + pri = pri + rand(fuzz + 1) if pri != :direct - code = gen(obj, sel, args) - RAILS_DEFAULT_LOGGER.info("put #{pri} #{code}") - put!(pkg(code, worker_opts), pri, delay, ttr, tube) + if interpolator + code = packed = interpolator + else + code = gen(obj, sel, args) + packed = pkg(code, worker_opts) + end + + RAILS_DEFAULT_LOGGER.info("put #{pri} #{code} to #{tube}") + put!(packed, pri, delay, ttr, tube) end def pkg(code, opts) opts.merge(:type => :rails, :code => code) end