lib/upperkut/strategy.rb in upperkut-0.5.2 vs lib/upperkut/strategy.rb in upperkut-0.6.0

- old
+ new

@@ -1,54 +1,73 @@ require_relative 'util' +require_relative 'redis_pool' module Upperkut class Strategy include Upperkut::Util - attr_accessor :worker, :redis + attr_reader :options - def initialize(worker, redis = Redis.new) - self.worker = worker - self.redis = redis + def initialize(worker, options = {}) + @options = options + @redis_options = options.fetch(:redis, {}) + @redis_pool = setup_redis_pool + @worker = worker end def push_items(items = []) items = [items] if items.is_a?(Hash) return false if items.empty? - redis.rpush(key, encode_json_items(items)) + redis do |conn| + conn.rpush(key, encode_json_items(items)) + end end def fetch_items(batch_size = 1000) stop = [batch_size, size].min - items = redis.multi do - stop.times do - redis.lpop(key) + items = redis do |conn| + conn.multi do + stop.times { conn.lpop(key) } end end decode_json_items(items) end def size - redis.llen(key) + redis do |conn| + conn.llen(key) + end end def latency - item = redis.lrange(key, 0, 0) + item = redis { |conn| conn.lrange(key, 0, 0) } item = decode_json_items(item).first return 0 unless item now = Time.now.to_f now - item.fetch('enqueued_at', Time.now).to_f end def clear - redis.del(key) + redis { |conn| conn.del(key) } end private + def setup_redis_pool + return @redis_options if @redis_options.is_a?(ConnectionPool) + RedisPool.new(options.fetch(:redis, {})).create + end + + def redis + raise ArgumentError, "requires a block" unless block_given? + @redis_pool.with do |conn| + yield conn + end + end + def key - "upperkut:buffers:#{to_underscore(worker.name)}" + "upperkut:buffers:#{to_underscore(@worker.name)}" end end end